24#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
28#include <condition_variable>
57template <
typename Transporter,
typename InnerTransporter,
typename Enable =
void>
66 static_assert(std::is_void<Enable>::value,
"InnerTransporterInterface must be specialized");
70 static_assert(std::is_void<Enable>::value,
"InnerTransporterInterface must be specialized");
75template <
typename Transporter,
typename InnerTransporter>
77 Transporter, InnerTransporter,
78 typename
std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
79 !std::is_same<InnerTransporter, NullTransporter>::value>>
82 Transporter, InnerTransporter,
83 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
84 !std::is_same<InnerTransporter, NullTransporter>::value>>;
88 InnerTransporter&
inner() {
return inner_; }
101 std::shared_ptr<InnerTransporter> own_inner_;
102 InnerTransporter& inner_;
106template <
typename Transporter,
typename InnerTransporter>
108 Transporter, InnerTransporter,
109 typename
std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
110 std::is_same<InnerTransporter, NullTransporter>::value>>
116 InnerTransporter&
inner() {
return inner_; }
117 Transporter&
innermost() {
return *
static_cast<Transporter*
>(
this); }
126 std::shared_ptr<InnerTransporter> own_inner_;
127 InnerTransporter& inner_;
131template <
typename Transporter,
typename InnerTransporter>
133 Transporter, InnerTransporter,
134 typename
std::enable_if_t<std::is_same<Transporter, NullTransporter>::value &&
135 std::is_same<InnerTransporter, NullTransporter>::value>>
147 template <
class Clock = std::chrono::system_clock,
class Duration =
typename Clock::duration>
148 int poll(
const std::chrono::time_point<Clock, Duration>& timeout =
149 std::chrono::time_point<Clock, Duration>::max());
155 template <
class Clock = std::chrono::system_clock,
class Duration =
typename Clock::duration>
156 int poll(Duration wait_for);
161 std::shared_ptr<std::mutex>
poll_mutex() {
return poll_mutex_; }
167 std::shared_ptr<std::condition_variable>
cv() {
return cv_; }
184 throw(
goby::Exception(
"Cannot attach PollerInterface with a different cv() and/or "
185 "poll_mutex(). Make sure the PollerInterface you are trying to "
186 "attach has the same innermost PollerInterface"));
188 std::lock_guard<std::mutex> lock(*poll_mutex_);
189 if (std::find(attached_pollers_.begin(), attached_pollers_.end(), poller) !=
190 attached_pollers_.end())
191 throw(
goby::Exception(
"Cannot attach the same PollerInterface more than once"));
192 attached_pollers_.push_back(poller);
197 std::shared_ptr<std::condition_variable>
cv)
203 template <
typename Transporter>
friend class Poller;
205 virtual int _transporter_poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) = 0;
209 template <
class Clock = std::chrono::system_clock,
class Duration =
typename Clock::duration>
210 int _poll_all(
const std::chrono::time_point<Clock, Duration>& timeout);
212 std::shared_ptr<std::mutex> poll_mutex_;
214 std::shared_ptr<std::condition_variable> cv_;
217 std::vector<PollerInterface*> attached_pollers_;
232template <
typename Transporter,
typename InnerTransporter>
244 int scheme = transporter_scheme<Data, Transporter>()>
247 static_cast<Transporter*
>(
this)->
template check_validity<group>();
248 static_cast<Transporter*
>(
this)->
template publish_dynamic<Data, scheme>(data,
group,
263 int scheme = transporter_scheme<Data, Transporter>()>
264 void publish(std::shared_ptr<const Data> data,
267 static_cast<Transporter*
>(
this)->
template check_validity<group>();
268 static_cast<Transporter*
>(
this)->
template publish_dynamic<Data, scheme>(data,
group,
283 int scheme = transporter_scheme<Data, Transporter>()>
286 publish<group, Data, scheme>(std::shared_ptr<const Data>(data), publisher);
298 int scheme = transporter_scheme<Data, Transporter>(),
303 static_cast<Transporter*
>(
this)->
template check_validity<group>();
304 static_cast<Transporter*
>(
this)->
template subscribe_dynamic<Data, scheme>(f,
group,
317 int scheme = transporter_scheme<Data, Transporter>(),
319 void subscribe(std::function<
void(std::shared_ptr<const Data>)> f,
322 static_cast<Transporter*
>(
this)->
template check_validity<group>();
323 static_cast<Transporter*
>(
this)->
template subscribe_dynamic<Data, scheme>(f,
group,
336 template <const Group& group, Necessity necessity = Necessity::OPTIONAL,
typename Func>
341 typename std::decay<detail::first_argument<Func>>::type>::type;
343 subscribe<group, Data, transporter_scheme<Data, Transporter>(), necessity>(f);
352 int scheme = transporter_scheme<Data, Transporter>()>
355 static_cast<Transporter*
>(
this)->
template check_validity<group>();
356 static_cast<Transporter*
>(
this)->
template unsubscribe_dynamic<Data, scheme>(
group,
374template <
class Clock,
class Duration>
377 return _poll_all(timeout);
380template <
class Clock,
class Duration>
383 if (wait_for == Duration::max())
386 return poll(Clock::now() + wait_for);
389template <
class Clock,
class Duration>
390int goby::middleware::PollerInterface::_poll_all(
391 const std::chrono::time_point<Clock, Duration>& timeout)
394 std::unique_ptr<std::unique_lock<std::mutex>> lock(
395 new std::unique_lock<std::mutex>(*poll_mutex_));
398 auto poll_all_once = [
this, &lock]()
400 int poll_items = _transporter_poll(lock);
401 for (
auto* poller : attached_pollers_) poll_items += poller->_transporter_poll(lock);
404 int poll_items = poll_all_once();
405 while (poll_items == 0)
409 "Poller lock was released by poll() but no poll items were returned"));
411 if (timeout == Clock::time_point::max())
414 poll_items = poll_all_once();
425 if (cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout)
426 poll_items = poll_all_once();
simple exception class for goby applications
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporterInterface()
Generate a local instantiation of the inner transporter.
InnerTransporter & inner()
InnerTransporterInterface(InnerTransporter &inner)
Pass in an external inner transporter for use.
Transporter & innermost()
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Real transporter that has a real inner transporter.
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
InnerTransporter & inner()
InnerTransporterInterface()
Generate a local instantiation of the inner transporter.
InnerTransporterInterface(InnerTransporter &inner)
Pass in an external inner transporter for use.
Recursive inner layer transporter storage or generator.
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
InnerTransporter & inner()
Defines the common interface for polling for data on Goby transporters.
PollerInterface(std::shared_ptr< std::mutex > poll_mutex, std::shared_ptr< std::condition_variable > cv)
std::shared_ptr< std::mutex > poll_mutex()
access the mutex used for poll synchronization
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
void attach(PollerInterface *poller)
Attach another PollerInterface to this one so that its _transporter_poll() is also called during _pol...
std::shared_ptr< std::condition_variable > cv()
access the condition variable used for poll synchronization
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Class that holds additional metadata and callback functions related to a publication (and is optional...
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(Func f)
Simplified version of subscribe() that can deduce Data from the first argument of the function (lambd...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
StaticTransporterInterface(InnerTransporter &inner)
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
void unsubscribe_all()
Unsubscribe to all messages that this transporter has subscribed to.
StaticTransporterInterface()
void publish(std::shared_ptr< const Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to const data variant)
void publish(std::shared_ptr< Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to mutable data variant)
void subscribe(std::function< void(std::shared_ptr< const Data >)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (shared pointer variant)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Necessity
Used to tag subscriptions based on their necessity (e.g. required for correct functioning,...
The global namespace for the Goby project.