Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
interface.h
Go to the documentation of this file.
1// Copyright 2017-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
26
27#include <chrono>
28#include <condition_variable>
29#include <memory>
30#include <mutex>
31#include <vector>
32
35
36#include "goby/exception.h"
44
45namespace goby
46{
47namespace middleware
48{
49class NullTransporter;
50
57template <typename Transporter, typename InnerTransporter, typename Enable = void>
59{
60 public:
62 using InnerTransporterType = InnerTransporter;
64 InnerTransporter& inner()
65 {
66 static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
67 }
68 auto innermost()
69 {
70 static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
71 }
72};
73
75template <typename Transporter, typename InnerTransporter>
77 Transporter, InnerTransporter,
78 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
79 !std::is_same<InnerTransporter, NullTransporter>::value>>
80{
82 Transporter, InnerTransporter,
83 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
84 !std::is_same<InnerTransporter, NullTransporter>::value>>;
85
86 public:
88 InnerTransporter& inner() { return inner_; }
89 auto& innermost() { return inner_.innermost(); }
90
92 using InnerTransporterType = InnerTransporter;
93
94 protected:
96 InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
98 InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
99
100 private:
101 std::shared_ptr<InnerTransporter> own_inner_;
102 InnerTransporter& inner_;
103};
104
106template <typename Transporter, typename InnerTransporter>
108 Transporter, InnerTransporter,
109 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
110 std::is_same<InnerTransporter, NullTransporter>::value>>
111{
112 public:
114 using InnerTransporterType = InnerTransporter;
116 InnerTransporter& inner() { return inner_; }
117 Transporter& innermost() { return *static_cast<Transporter*>(this); }
118
119 protected:
121 InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
123 InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
124
125 private:
126 std::shared_ptr<InnerTransporter> own_inner_;
127 InnerTransporter& inner_;
128};
129
131template <typename Transporter, typename InnerTransporter>
133 Transporter, InnerTransporter,
134 typename std::enable_if_t<std::is_same<Transporter, NullTransporter>::value &&
135 std::is_same<InnerTransporter, NullTransporter>::value>>
136{
137};
138
141{
142 public:
147 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
148 int poll(const std::chrono::time_point<Clock, Duration>& timeout =
149 std::chrono::time_point<Clock, Duration>::max());
150
155 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
156 int poll(Duration wait_for);
157
161 std::shared_ptr<std::mutex> poll_mutex() { return poll_mutex_; }
162
167 std::shared_ptr<std::condition_variable> cv() { return cv_; }
168
176 {
177 if (!poller)
178 throw(goby::Exception("Cannot attach a null PollerInterface"));
179
180 if (poller == this)
181 throw(goby::Exception("Cannot attach a PollerInterface to itself"));
182
183 if (poller->cv() != cv() || poller->poll_mutex() != poll_mutex())
184 throw(goby::Exception("Cannot attach PollerInterface with a different cv() and/or "
185 "poll_mutex(). Make sure the PollerInterface you are trying to "
186 "attach has the same innermost PollerInterface"));
187
188 std::lock_guard<std::mutex> lock(*poll_mutex_);
189 if (std::find(attached_pollers_.begin(), attached_pollers_.end(), poller) !=
190 attached_pollers_.end())
191 throw(goby::Exception("Cannot attach the same PollerInterface more than once"));
192 attached_pollers_.push_back(poller);
193 }
194
195 protected:
196 PollerInterface(std::shared_ptr<std::mutex> poll_mutex,
197 std::shared_ptr<std::condition_variable> cv)
198 : poll_mutex_(poll_mutex), cv_(cv)
199 {
200 }
201
202 private:
203 template <typename Transporter> friend class Poller;
204 // poll the transporter for data
205 virtual int _transporter_poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) = 0;
206
207 private:
208 // poll all the transporters for data, including a timeout (only called by the outside-most Poller)
209 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
210 int _poll_all(const std::chrono::time_point<Clock, Duration>& timeout);
211
212 std::shared_ptr<std::mutex> poll_mutex_;
213 // signaled when there's no data for this thread to read during _poll()
214 std::shared_ptr<std::condition_variable> cv_;
215 // non-owning pointers to additional PollerInterface instances to poll alongside this one;
216 // attached pollers must remain valid for the lifetime of this instance
217 std::vector<PollerInterface*> attached_pollers_;
218};
219
221enum class Necessity
222{
223 REQUIRED,
226};
227
232template <typename Transporter, typename InnerTransporter>
233class StaticTransporterInterface : public InnerTransporterInterface<Transporter, InnerTransporter>
234{
235 public:
243 template <const Group& group, typename Data,
244 int scheme = transporter_scheme<Data, Transporter>()>
245 void publish(const Data& data, const Publisher<Data>& publisher = Publisher<Data>())
246 {
247 static_cast<Transporter*>(this)->template check_validity<group>();
248 static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
249 publisher);
250 }
251
262 template <const Group& group, typename Data,
263 int scheme = transporter_scheme<Data, Transporter>()>
264 void publish(std::shared_ptr<const Data> data,
265 const Publisher<Data>& publisher = Publisher<Data>())
266 {
267 static_cast<Transporter*>(this)->template check_validity<group>();
268 static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
269 publisher);
270 }
271
282 template <const Group& group, typename Data,
283 int scheme = transporter_scheme<Data, Transporter>()>
284 void publish(std::shared_ptr<Data> data, const Publisher<Data>& publisher = Publisher<Data>())
285 {
286 publish<group, Data, scheme>(std::shared_ptr<const Data>(data), publisher);
287 }
288
297 template <const Group& group, typename Data,
298 int scheme = transporter_scheme<Data, Transporter>(),
299 Necessity necessity = Necessity::OPTIONAL>
300 void subscribe(std::function<void(const Data&)> f,
301 const Subscriber<Data>& subscriber = Subscriber<Data>())
302 {
303 static_cast<Transporter*>(this)->template check_validity<group>();
304 static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
305 subscriber);
306 }
307
316 template <const Group& group, typename Data,
317 int scheme = transporter_scheme<Data, Transporter>(),
318 Necessity necessity = Necessity::OPTIONAL>
319 void subscribe(std::function<void(std::shared_ptr<const Data>)> f,
320 const Subscriber<Data>& subscriber = Subscriber<Data>())
321 {
322 static_cast<Transporter*>(this)->template check_validity<group>();
323 static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
324 subscriber);
325 }
326
336 template <const Group& group, Necessity necessity = Necessity::OPTIONAL, typename Func>
337 void subscribe(Func f)
338 {
339 // we want to grab the first argument of "f" and then capture "Data" from "const Data& data" and "std::shared_ptr<const Data>"
340 using Data = typename detail::primitive_type<
341 typename std::decay<detail::first_argument<Func>>::type>::type;
342
343 subscribe<group, Data, transporter_scheme<Data, Transporter>(), necessity>(f);
344 }
345
351 template <const Group& group, typename Data,
352 int scheme = transporter_scheme<Data, Transporter>()>
354 {
355 static_cast<Transporter*>(this)->template check_validity<group>();
356 static_cast<Transporter*>(this)->template unsubscribe_dynamic<Data, scheme>(group,
357 subscriber);
358 }
359
361 void unsubscribe_all() { static_cast<Transporter*>(this)->template unsubscribe_all(); }
362
363 protected:
365 : InnerTransporterInterface<Transporter, InnerTransporter>(inner)
366 {
367 }
369};
370
371} // namespace middleware
372} // namespace goby
373
374template <class Clock, class Duration>
375int goby::middleware::PollerInterface::poll(const std::chrono::time_point<Clock, Duration>& timeout)
376{
377 return _poll_all(timeout);
378}
379
380template <class Clock, class Duration>
382{
383 if (wait_for == Duration::max())
384 return poll();
385 else
386 return poll(Clock::now() + wait_for);
387}
388
389template <class Clock, class Duration>
390int goby::middleware::PollerInterface::_poll_all(
391 const std::chrono::time_point<Clock, Duration>& timeout)
392{
393 // hold this lock until either we find a polled item or we wait on the condition variable
394 std::unique_ptr<std::unique_lock<std::mutex>> lock(
395 new std::unique_lock<std::mutex>(*poll_mutex_));
396 // std::cout << std::this_thread::get_id() << " _poll_all locking: " << poll_mutex_.get() << std::endl;
397
398 auto poll_all_once = [this, &lock]()
399 {
400 int poll_items = _transporter_poll(lock);
401 for (auto* poller : attached_pollers_) poll_items += poller->_transporter_poll(lock);
402 return poll_items;
403 };
404 int poll_items = poll_all_once();
405 while (poll_items == 0)
406 {
407 if (!lock)
408 throw(goby::Exception(
409 "Poller lock was released by poll() but no poll items were returned"));
410
411 if (timeout == Clock::time_point::max())
412 {
413 cv_->wait(*lock); // wait_until doesn't work well with time_point::max()
414 poll_items = poll_all_once();
415
416 // TODO: fix this message now that zeromq::InterProcessPortal can have
417 // a condition_variable trigger for REQUEST_HOLD_STATE
418 //if (poll_items == 0)
419 // goby::glog.is(goby::util::logger::DEBUG3) &&
420 // goby::glog << "PollerInterface condition_variable: no data (spurious?) wakeup"
421 // << std::endl;
422 }
423 else
424 {
425 if (cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout)
426 poll_items = poll_all_once();
427 else
428 return poll_items;
429 }
430 }
431
432 return poll_items;
433}
434
435#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Recursive inner layer transporter storage or generator.
Definition interface.h:59
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Definition interface.h:62
Defines the common interface for polling for data on Goby transporters.
Definition interface.h:141
PollerInterface(std::shared_ptr< std::mutex > poll_mutex, std::shared_ptr< std::condition_variable > cv)
Definition interface.h:196
std::shared_ptr< std::mutex > poll_mutex()
access the mutex used for poll synchronization
Definition interface.h:161
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition interface.h:375
void attach(PollerInterface *poller)
Attach another PollerInterface to this one so that its _transporter_poll() is also called during _pol...
Definition interface.h:175
std::shared_ptr< std::condition_variable > cv()
access the condition variable used for poll synchronization
Definition interface.h:167
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:234
void subscribe(Func f)
Simplified version of subscribe() that can deduce Data from the first argument of the function (lambd...
Definition interface.h:337
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:300
StaticTransporterInterface(InnerTransporter &inner)
Definition interface.h:364
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition interface.h:353
void unsubscribe_all()
Unsubscribe to all messages that this transporter has subscribed to.
Definition interface.h:361
void publish(std::shared_ptr< const Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to const data variant)
Definition interface.h:264
void publish(std::shared_ptr< Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to mutable data variant)
Definition interface.h:284
void subscribe(std::function< void(std::shared_ptr< const Data >)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (shared pointer variant)
Definition interface.h:319
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:245
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
Necessity
Used to tag subscriptions based on their necessity (e.g. required for correct functioning,...
Definition interface.h:222
The global namespace for the Goby project.
STL namespace.