25#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
29#include <condition_variable>
58template <
typename Transporter,
typename InnerTransporter,
typename Enable =
void>
67 static_assert(std::is_void<Enable>::value,
"InnerTransporterInterface must be specialized");
71 static_assert(std::is_void<Enable>::value,
"InnerTransporterInterface must be specialized");
76template <
typename Transporter,
typename InnerTransporter>
78 Transporter, InnerTransporter,
79 typename
std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
80 !std::is_same<InnerTransporter, NullTransporter>::value>>
83 Transporter, InnerTransporter,
84 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
85 !std::is_same<InnerTransporter, NullTransporter>::value>>;
89 InnerTransporter&
inner() {
return inner_; }
102 std::shared_ptr<InnerTransporter> own_inner_;
103 InnerTransporter& inner_;
107template <
typename Transporter,
typename InnerTransporter>
109 Transporter, InnerTransporter,
110 typename
std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
111 std::is_same<InnerTransporter, NullTransporter>::value>>
117 InnerTransporter&
inner() {
return inner_; }
118 Transporter&
innermost() {
return *
static_cast<Transporter*
>(
this); }
127 std::shared_ptr<InnerTransporter> own_inner_;
128 InnerTransporter& inner_;
132template <
typename Transporter,
typename InnerTransporter>
134 Transporter, InnerTransporter,
135 typename
std::enable_if_t<std::is_same<Transporter, NullTransporter>::value &&
136 std::is_same<InnerTransporter, NullTransporter>::value>>
148 template <
class Clock = std::chrono::system_clock,
class Duration =
typename Clock::duration>
149 int poll(
const std::chrono::time_point<Clock, Duration>& timeout =
150 std::chrono::time_point<Clock, Duration>::max());
156 template <
class Clock = std::chrono::system_clock,
class Duration =
typename Clock::duration>
157 int poll(Duration wait_for);
162 std::shared_ptr<std::mutex>
poll_mutex() {
return poll_mutex_; }
168 std::shared_ptr<std::condition_variable>
cv() {
return cv_; }
185 throw(
goby::Exception(
"Cannot attach PollerInterface with a different cv() and/or "
186 "poll_mutex(). Make sure the PollerInterface you are trying to "
187 "attach has the same innermost PollerInterface"));
189 std::lock_guard<std::mutex> lock(*poll_mutex_);
190 if (std::find(attached_pollers_.begin(), attached_pollers_.end(), poller) !=
191 attached_pollers_.end())
192 throw(
goby::Exception(
"Cannot attach the same PollerInterface more than once"));
193 attached_pollers_.push_back(poller);
198 std::shared_ptr<std::condition_variable>
cv)
204 template <
typename Transporter>
friend class Poller;
206 virtual int _transporter_poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) = 0;
210 template <
class Clock = std::chrono::system_clock,
class Duration =
typename Clock::duration>
211 int _poll_all(
const std::chrono::time_point<Clock, Duration>& timeout);
213 std::shared_ptr<std::mutex> poll_mutex_;
215 std::shared_ptr<std::condition_variable> cv_;
218 std::vector<PollerInterface*> attached_pollers_;
233template <
typename Transporter,
typename InnerTransporter>
245 int scheme = transporter_scheme<Data, Transporter>()>
248 static_cast<Transporter*
>(
this)->
template check_validity<group>();
249 static_cast<Transporter*
>(
this)->
template publish_dynamic<Data, scheme>(data,
group,
264 int scheme = transporter_scheme<Data, Transporter>()>
265 void publish(std::shared_ptr<const Data> data,
268 static_cast<Transporter*
>(
this)->
template check_validity<group>();
269 static_cast<Transporter*
>(
this)->
template publish_dynamic<Data, scheme>(data,
group,
284 int scheme = transporter_scheme<Data, Transporter>()>
287 publish<group, Data, scheme>(std::shared_ptr<const Data>(data), publisher);
299 int scheme = transporter_scheme<Data, Transporter>(),
304 static_cast<Transporter*
>(
this)->
template check_validity<group>();
305 static_cast<Transporter*
>(
this)->
template subscribe_dynamic<Data, scheme>(f,
group,
318 int scheme = transporter_scheme<Data, Transporter>(),
320 void subscribe(std::function<
void(std::shared_ptr<const Data>)> f,
323 static_cast<Transporter*
>(
this)->
template check_validity<group>();
324 static_cast<Transporter*
>(
this)->
template subscribe_dynamic<Data, scheme>(f,
group,
337 template <const Group& group, Necessity necessity = Necessity::OPTIONAL,
typename Func>
342 typename std::decay<detail::first_argument<Func>>::type>::type;
344 subscribe<group, Data, transporter_scheme<Data, Transporter>(), necessity>(f);
353 int scheme = transporter_scheme<Data, Transporter>()>
356 static_cast<Transporter*
>(
this)->
template check_validity<group>();
357 static_cast<Transporter*
>(
this)->
template unsubscribe_dynamic<Data, scheme>(
group,
375template <
class Clock,
class Duration>
378 return _poll_all(timeout);
381template <
class Clock,
class Duration>
384 if (wait_for == Duration::max())
387 return poll(Clock::now() + wait_for);
390template <
class Clock,
class Duration>
391int goby::middleware::PollerInterface::_poll_all(
392 const std::chrono::time_point<Clock, Duration>& timeout)
395 std::unique_ptr<std::unique_lock<std::mutex>> lock(
396 new std::unique_lock<std::mutex>(*poll_mutex_));
399 auto poll_all_once = [
this, &lock]()
401 int poll_items = _transporter_poll(lock);
402 for (
auto* poller : attached_pollers_) poll_items += poller->_transporter_poll(lock);
405 int poll_items = poll_all_once();
406 while (poll_items == 0)
410 "Poller lock was released by poll() but no poll items were returned"));
412 if (timeout == Clock::time_point::max())
415 poll_items = poll_all_once();
426 if (cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout)
427 poll_items = poll_all_once();
simple exception class for goby applications
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporterInterface()
Generate a local instantiation of the inner transporter.
InnerTransporter & inner()
InnerTransporterInterface(InnerTransporter &inner)
Pass in an external inner transporter for use.
Transporter & innermost()
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Real transporter that has a real inner transporter.
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
InnerTransporter & inner()
InnerTransporterInterface()
Generate a local instantiation of the inner transporter.
InnerTransporterInterface(InnerTransporter &inner)
Pass in an external inner transporter for use.
Recursive inner layer transporter storage or generator.
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
InnerTransporter & inner()
Defines the common interface for polling for data on Goby transporters.
PollerInterface(std::shared_ptr< std::mutex > poll_mutex, std::shared_ptr< std::condition_variable > cv)
std::shared_ptr< std::mutex > poll_mutex()
access the mutex used for poll synchronization
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
void attach(PollerInterface *poller)
Attach another PollerInterface to this one so that its _transporter_poll() is also called during _pol...
std::shared_ptr< std::condition_variable > cv()
access the condition variable used for poll synchronization
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Class that holds additional metadata and callback functions related to a publication (and is optional...
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(Func f)
Simplified version of subscribe() that can deduce Data from the first argument of the function (lambd...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
StaticTransporterInterface(InnerTransporter &inner)
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
void unsubscribe_all()
Unsubscribe to all messages that this transporter has subscribed to.
StaticTransporterInterface()
void publish(std::shared_ptr< const Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to const data variant)
void publish(std::shared_ptr< Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to mutable data variant)
void subscribe(std::function< void(std::shared_ptr< const Data >)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (shared pointer variant)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Necessity
Used to tag subscriptions based on their necessity (e.g. required for correct functioning,...
The global namespace for the Goby project.