Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
interface.h
Go to the documentation of this file.
1// Copyright 2017-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
27
28#include <chrono>
29#include <condition_variable>
30#include <memory>
31#include <mutex>
32#include <vector>
33
36
37#include "goby/exception.h"
45
46namespace goby
47{
48namespace middleware
49{
50class NullTransporter;
51
58template <typename Transporter, typename InnerTransporter, typename Enable = void>
60{
61 public:
63 using InnerTransporterType = InnerTransporter;
65 InnerTransporter& inner()
66 {
67 static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
68 }
69 auto innermost()
70 {
71 static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
72 }
73};
74
76template <typename Transporter, typename InnerTransporter>
78 Transporter, InnerTransporter,
79 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
80 !std::is_same<InnerTransporter, NullTransporter>::value>>
81{
83 Transporter, InnerTransporter,
84 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
85 !std::is_same<InnerTransporter, NullTransporter>::value>>;
86
87 public:
89 InnerTransporter& inner() { return inner_; }
90 auto& innermost() { return inner_.innermost(); }
91
93 using InnerTransporterType = InnerTransporter;
94
95 protected:
97 InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
99 InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
100
101 private:
102 std::shared_ptr<InnerTransporter> own_inner_;
103 InnerTransporter& inner_;
104};
105
107template <typename Transporter, typename InnerTransporter>
109 Transporter, InnerTransporter,
110 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
111 std::is_same<InnerTransporter, NullTransporter>::value>>
112{
113 public:
115 using InnerTransporterType = InnerTransporter;
117 InnerTransporter& inner() { return inner_; }
118 Transporter& innermost() { return *static_cast<Transporter*>(this); }
119
120 protected:
122 InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
124 InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
125
126 private:
127 std::shared_ptr<InnerTransporter> own_inner_;
128 InnerTransporter& inner_;
129};
130
132template <typename Transporter, typename InnerTransporter>
134 Transporter, InnerTransporter,
135 typename std::enable_if_t<std::is_same<Transporter, NullTransporter>::value &&
136 std::is_same<InnerTransporter, NullTransporter>::value>>
137{
138};
139
142{
143 public:
148 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
149 int poll(const std::chrono::time_point<Clock, Duration>& timeout =
150 std::chrono::time_point<Clock, Duration>::max());
151
156 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
157 int poll(Duration wait_for);
158
162 std::shared_ptr<std::mutex> poll_mutex() { return poll_mutex_; }
163
168 std::shared_ptr<std::condition_variable> cv() { return cv_; }
169
177 {
178 if (!poller)
179 throw(goby::Exception("Cannot attach a null PollerInterface"));
180
181 if (poller == this)
182 throw(goby::Exception("Cannot attach a PollerInterface to itself"));
183
184 if (poller->cv() != cv() || poller->poll_mutex() != poll_mutex())
185 throw(goby::Exception("Cannot attach PollerInterface with a different cv() and/or "
186 "poll_mutex(). Make sure the PollerInterface you are trying to "
187 "attach has the same innermost PollerInterface"));
188
189 std::lock_guard<std::mutex> lock(*poll_mutex_);
190 if (std::find(attached_pollers_.begin(), attached_pollers_.end(), poller) !=
191 attached_pollers_.end())
192 throw(goby::Exception("Cannot attach the same PollerInterface more than once"));
193 attached_pollers_.push_back(poller);
194 }
195
196 protected:
197 PollerInterface(std::shared_ptr<std::mutex> poll_mutex,
198 std::shared_ptr<std::condition_variable> cv)
199 : poll_mutex_(poll_mutex), cv_(cv)
200 {
201 }
202
203 private:
204 template <typename Transporter> friend class Poller;
205 // poll the transporter for data
206 virtual int _transporter_poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) = 0;
207
208 private:
209 // poll all the transporters for data, including a timeout (only called by the outside-most Poller)
210 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
211 int _poll_all(const std::chrono::time_point<Clock, Duration>& timeout);
212
213 std::shared_ptr<std::mutex> poll_mutex_;
214 // signaled when there's no data for this thread to read during _poll()
215 std::shared_ptr<std::condition_variable> cv_;
216 // non-owning pointers to additional PollerInterface instances to poll alongside this one;
217 // attached pollers must remain valid for the lifetime of this instance
218 std::vector<PollerInterface*> attached_pollers_;
219};
220
222enum class Necessity
223{
224 REQUIRED,
227};
228
233template <typename Transporter, typename InnerTransporter>
234class StaticTransporterInterface : public InnerTransporterInterface<Transporter, InnerTransporter>
235{
236 public:
244 template <const Group& group, typename Data,
245 int scheme = transporter_scheme<Data, Transporter>()>
246 void publish(const Data& data, const Publisher<Data>& publisher = Publisher<Data>())
247 {
248 static_cast<Transporter*>(this)->template check_validity<group>();
249 static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
250 publisher);
251 }
252
263 template <const Group& group, typename Data,
264 int scheme = transporter_scheme<Data, Transporter>()>
265 void publish(std::shared_ptr<const Data> data,
266 const Publisher<Data>& publisher = Publisher<Data>())
267 {
268 static_cast<Transporter*>(this)->template check_validity<group>();
269 static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
270 publisher);
271 }
272
283 template <const Group& group, typename Data,
284 int scheme = transporter_scheme<Data, Transporter>()>
285 void publish(std::shared_ptr<Data> data, const Publisher<Data>& publisher = Publisher<Data>())
286 {
287 publish<group, Data, scheme>(std::shared_ptr<const Data>(data), publisher);
288 }
289
298 template <const Group& group, typename Data,
299 int scheme = transporter_scheme<Data, Transporter>(),
300 Necessity necessity = Necessity::OPTIONAL>
301 void subscribe(std::function<void(const Data&)> f,
302 const Subscriber<Data>& subscriber = Subscriber<Data>())
303 {
304 static_cast<Transporter*>(this)->template check_validity<group>();
305 static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
306 subscriber);
307 }
308
317 template <const Group& group, typename Data,
318 int scheme = transporter_scheme<Data, Transporter>(),
319 Necessity necessity = Necessity::OPTIONAL>
320 void subscribe(std::function<void(std::shared_ptr<const Data>)> f,
321 const Subscriber<Data>& subscriber = Subscriber<Data>())
322 {
323 static_cast<Transporter*>(this)->template check_validity<group>();
324 static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
325 subscriber);
326 }
327
337 template <const Group& group, Necessity necessity = Necessity::OPTIONAL, typename Func>
338 void subscribe(Func f)
339 {
340 // we want to grab the first argument of "f" and then capture "Data" from "const Data& data" and "std::shared_ptr<const Data>"
341 using Data = typename detail::primitive_type<
342 typename std::decay<detail::first_argument<Func>>::type>::type;
343
344 subscribe<group, Data, transporter_scheme<Data, Transporter>(), necessity>(f);
345 }
346
352 template <const Group& group, typename Data,
353 int scheme = transporter_scheme<Data, Transporter>()>
355 {
356 static_cast<Transporter*>(this)->template check_validity<group>();
357 static_cast<Transporter*>(this)->template unsubscribe_dynamic<Data, scheme>(group,
358 subscriber);
359 }
360
362 void unsubscribe_all() { static_cast<Transporter*>(this)->unsubscribe_all(); }
363
364 protected:
366 : InnerTransporterInterface<Transporter, InnerTransporter>(inner)
367 {
368 }
370};
371
372} // namespace middleware
373} // namespace goby
374
375template <class Clock, class Duration>
376int goby::middleware::PollerInterface::poll(const std::chrono::time_point<Clock, Duration>& timeout)
377{
378 return _poll_all(timeout);
379}
380
381template <class Clock, class Duration>
383{
384 if (wait_for == Duration::max())
385 return poll();
386 else
387 return poll(Clock::now() + wait_for);
388}
389
390template <class Clock, class Duration>
391int goby::middleware::PollerInterface::_poll_all(
392 const std::chrono::time_point<Clock, Duration>& timeout)
393{
394 // hold this lock until either we find a polled item or we wait on the condition variable
395 std::unique_ptr<std::unique_lock<std::mutex>> lock(
396 new std::unique_lock<std::mutex>(*poll_mutex_));
397 // std::cout << std::this_thread::get_id() << " _poll_all locking: " << poll_mutex_.get() << std::endl;
398
399 auto poll_all_once = [this, &lock]()
400 {
401 int poll_items = _transporter_poll(lock);
402 for (auto* poller : attached_pollers_) poll_items += poller->_transporter_poll(lock);
403 return poll_items;
404 };
405 int poll_items = poll_all_once();
406 while (poll_items == 0)
407 {
408 if (!lock)
409 throw(goby::Exception(
410 "Poller lock was released by poll() but no poll items were returned"));
411
412 if (timeout == Clock::time_point::max())
413 {
414 cv_->wait(*lock); // wait_until doesn't work well with time_point::max()
415 poll_items = poll_all_once();
416
417 // TODO: fix this message now that zeromq::InterProcessPortal can have
418 // a condition_variable trigger for REQUEST_HOLD_STATE
419 //if (poll_items == 0)
420 // goby::glog.is(goby::util::logger::DEBUG3) &&
421 // goby::glog << "PollerInterface condition_variable: no data (spurious?) wakeup"
422 // << std::endl;
423 }
424 else
425 {
426 if (cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout)
427 poll_items = poll_all_once();
428 else
429 return poll_items;
430 }
431 }
432
433 return poll_items;
434}
435
436#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
Recursive inner layer transporter storage or generator.
Definition interface.h:60
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Definition interface.h:63
Defines the common interface for polling for data on Goby transporters.
Definition interface.h:142
PollerInterface(std::shared_ptr< std::mutex > poll_mutex, std::shared_ptr< std::condition_variable > cv)
Definition interface.h:197
std::shared_ptr< std::mutex > poll_mutex()
access the mutex used for poll synchronization
Definition interface.h:162
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition interface.h:376
void attach(PollerInterface *poller)
Attach another PollerInterface to this one so that its _transporter_poll() is also called during _pol...
Definition interface.h:176
std::shared_ptr< std::condition_variable > cv()
access the condition variable used for poll synchronization
Definition interface.h:168
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:39
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:235
void subscribe(Func f)
Simplified version of subscribe() that can deduce Data from the first argument of the function (lambd...
Definition interface.h:338
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:301
StaticTransporterInterface(InnerTransporter &inner)
Definition interface.h:365
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition interface.h:354
void unsubscribe_all()
Unsubscribe to all messages that this transporter has subscribed to.
Definition interface.h:362
void publish(std::shared_ptr< const Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to const data variant)
Definition interface.h:265
void publish(std::shared_ptr< Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to mutable data variant)
Definition interface.h:285
void subscribe(std::function< void(std::shared_ptr< const Data >)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (shared pointer variant)
Definition interface.h:320
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:246
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
Necessity
Used to tag subscriptions based on their necessity (e.g. required for correct functioning,...
Definition interface.h:223
The global namespace for the Goby project.
STL namespace.