Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
interface.h
Go to the documentation of this file.
1// Copyright 2017-2021:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
26
27#include <chrono>
28#include <condition_variable>
29#include <memory>
30#include <mutex>
31
34
35#include "goby/exception.h"
43
44namespace goby
45{
46namespace middleware
47{
48class NullTransporter;
49
56template <typename Transporter, typename InnerTransporter, typename Enable = void>
58{
59 public:
61 using InnerTransporterType = InnerTransporter;
63 InnerTransporter& inner()
64 {
65 static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
66 }
67 auto innermost()
68 {
69 static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
70 }
71};
72
74template <typename Transporter, typename InnerTransporter>
76 Transporter, InnerTransporter,
77 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
78 !std::is_same<InnerTransporter, NullTransporter>::value>>
79{
81 Transporter, InnerTransporter,
82 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
83 !std::is_same<InnerTransporter, NullTransporter>::value>>;
84
85 public:
87 InnerTransporter& inner() { return inner_; }
88 auto& innermost() { return inner_.innermost(); }
89
91 using InnerTransporterType = InnerTransporter;
92
93 protected:
95 InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
97 InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
98
99 private:
100 std::shared_ptr<InnerTransporter> own_inner_;
101 InnerTransporter& inner_;
102};
103
105template <typename Transporter, typename InnerTransporter>
107 Transporter, InnerTransporter,
108 typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
109 std::is_same<InnerTransporter, NullTransporter>::value>>
110{
111 public:
113 using InnerTransporterType = InnerTransporter;
115 InnerTransporter& inner() { return inner_; }
116 Transporter& innermost() { return *static_cast<Transporter*>(this); }
117
118 protected:
120 InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
122 InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
123
124 private:
125 std::shared_ptr<InnerTransporter> own_inner_;
126 InnerTransporter& inner_;
127};
128
130template <typename Transporter, typename InnerTransporter>
132 Transporter, InnerTransporter,
133 typename std::enable_if_t<std::is_same<Transporter, NullTransporter>::value &&
134 std::is_same<InnerTransporter, NullTransporter>::value>>
135{
136};
137
140{
141 public:
146 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
147 int poll(const std::chrono::time_point<Clock, Duration>& timeout =
148 std::chrono::time_point<Clock, Duration>::max());
149
154 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
155 int poll(Duration wait_for);
156
160 std::shared_ptr<std::timed_mutex> poll_mutex() { return poll_mutex_; }
161
166 std::shared_ptr<std::condition_variable_any> cv() { return cv_; }
167
168 protected:
169 PollerInterface(std::shared_ptr<std::timed_mutex> poll_mutex,
170 std::shared_ptr<std::condition_variable_any> cv)
171 : poll_mutex_(poll_mutex), cv_(cv)
172 {
173 }
174
175 private:
176 template <typename Transporter> friend class Poller;
177 // poll the transporter for data
178 virtual int _transporter_poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) = 0;
179
180 private:
181 // poll all the transporters for data, including a timeout (only called by the outside-most Poller)
182 template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
183 int _poll_all(const std::chrono::time_point<Clock, Duration>& timeout);
184
185 std::shared_ptr<std::timed_mutex> poll_mutex_;
186 // signaled when there's no data for this thread to read during _poll()
187 std::shared_ptr<std::condition_variable_any> cv_;
188};
189
191enum class Necessity
192{
193 REQUIRED,
196};
197
202template <typename Transporter, typename InnerTransporter>
203class StaticTransporterInterface : public InnerTransporterInterface<Transporter, InnerTransporter>
204{
205 public:
213 template <const Group& group, typename Data,
214 int scheme = transporter_scheme<Data, Transporter>()>
215 void publish(const Data& data, const Publisher<Data>& publisher = Publisher<Data>())
216 {
217 static_cast<Transporter*>(this)->template check_validity<group>();
218 static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
219 publisher);
220 }
221
232 template <const Group& group, typename Data,
233 int scheme = transporter_scheme<Data, Transporter>()>
234 void publish(std::shared_ptr<const Data> data,
235 const Publisher<Data>& publisher = Publisher<Data>())
236 {
237 static_cast<Transporter*>(this)->template check_validity<group>();
238 static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
239 publisher);
240 }
241
252 template <const Group& group, typename Data,
253 int scheme = transporter_scheme<Data, Transporter>()>
254 void publish(std::shared_ptr<Data> data, const Publisher<Data>& publisher = Publisher<Data>())
255 {
256 publish<group, Data, scheme>(std::shared_ptr<const Data>(data), publisher);
257 }
258
267 template <const Group& group, typename Data,
268 int scheme = transporter_scheme<Data, Transporter>(),
269 Necessity necessity = Necessity::OPTIONAL>
270 void subscribe(std::function<void(const Data&)> f,
271 const Subscriber<Data>& subscriber = Subscriber<Data>())
272 {
273 static_cast<Transporter*>(this)->template check_validity<group>();
274 static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
275 subscriber);
276 }
277
286 template <const Group& group, typename Data,
287 int scheme = transporter_scheme<Data, Transporter>(),
288 Necessity necessity = Necessity::OPTIONAL>
289 void subscribe(std::function<void(std::shared_ptr<const Data>)> f,
290 const Subscriber<Data>& subscriber = Subscriber<Data>())
291 {
292 static_cast<Transporter*>(this)->template check_validity<group>();
293 static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
294 subscriber);
295 }
296
306 template <const Group& group, Necessity necessity = Necessity::OPTIONAL, typename Func>
307 void subscribe(Func f)
308 {
309 // we want to grab the first argument of "f" and then capture "Data" from "const Data& data" and "std::shared_ptr<const Data>"
310 using Data = typename detail::primitive_type<
311 typename std::decay<detail::first_argument<Func>>::type>::type;
312
313 subscribe<group, Data, transporter_scheme<Data, Transporter>(), necessity>(f);
314 }
315
321 template <const Group& group, typename Data,
322 int scheme = transporter_scheme<Data, Transporter>()>
324 {
325 static_cast<Transporter*>(this)->template check_validity<group>();
326 static_cast<Transporter*>(this)->template unsubscribe_dynamic<Data, scheme>(group,
327 subscriber);
328 }
329
331 void unsubscribe_all() { static_cast<Transporter*>(this)->template unsubscribe_all(); }
332
333 protected:
335 : InnerTransporterInterface<Transporter, InnerTransporter>(inner)
336 {
337 }
339};
340
341} // namespace middleware
342} // namespace goby
343
344template <class Clock, class Duration>
345int goby::middleware::PollerInterface::poll(const std::chrono::time_point<Clock, Duration>& timeout)
346{
347 return _poll_all(timeout);
348}
349
350template <class Clock, class Duration>
352{
353 if (wait_for == Duration::max())
354 return poll();
355 else
356 return poll(Clock::now() + wait_for);
357}
358
359template <class Clock, class Duration>
360int goby::middleware::PollerInterface::_poll_all(
361 const std::chrono::time_point<Clock, Duration>& timeout)
362{
363 // hold this lock until either we find a polled item or we wait on the condition variable
364 std::unique_ptr<std::unique_lock<std::timed_mutex>> lock(
365 new std::unique_lock<std::timed_mutex>(*poll_mutex_));
366 // std::cout << std::this_thread::get_id() << " _poll_all locking: " << poll_mutex_.get() << std::endl;
367
368 int poll_items = _transporter_poll(lock);
369 while (poll_items == 0)
370 {
371 if (!lock)
372 throw(goby::Exception(
373 "Poller lock was released by poll() but no poll items were returned"));
374
375 if (timeout == Clock::time_point::max())
376 {
377 cv_->wait(*lock); // wait_until doesn't work well with time_point::max()
378 poll_items = _transporter_poll(lock);
379
380 // TODO: fix this message now that zeromq::InterProcessPortal can have
381 // a condition_variable trigger for REQUEST_HOLD_STATE
382 //if (poll_items == 0)
383 // goby::glog.is(goby::util::logger::DEBUG3) &&
384 // goby::glog << "PollerInterface condition_variable: no data (spurious?) wakeup"
385 // << std::endl;
386 }
387 else
388 {
389 if (cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout)
390 poll_items = _transporter_poll(lock);
391 else
392 return poll_items;
393 }
394 }
395
396 return poll_items;
397}
398
399#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Recursive inner layer transporter storage or generator.
Definition interface.h:58
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Definition interface.h:61
Defines the common interface for polling for data on Goby transporters.
Definition interface.h:140
std::shared_ptr< std::condition_variable_any > cv()
access the condition variable used for poll synchronization
Definition interface.h:166
PollerInterface(std::shared_ptr< std::timed_mutex > poll_mutex, std::shared_ptr< std::condition_variable_any > cv)
Definition interface.h:169
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition interface.h:345
std::shared_ptr< std::timed_mutex > poll_mutex()
access the mutex used for poll synchronization
Definition interface.h:160
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:204
void subscribe(Func f)
Simplified version of subscribe() that can deduce Data from the first argument of the function (lambd...
Definition interface.h:307
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:270
StaticTransporterInterface(InnerTransporter &inner)
Definition interface.h:334
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition interface.h:323
void unsubscribe_all()
Unsubscribe to all messages that this transporter has subscribed to.
Definition interface.h:331
void publish(std::shared_ptr< const Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to const data variant)
Definition interface.h:234
void publish(std::shared_ptr< Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to mutable data variant)
Definition interface.h:254
void subscribe(std::function< void(std::shared_ptr< const Data >)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (shared pointer variant)
Definition interface.h:289
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:215
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
Necessity
Used to tag subscriptions based on their necessity (e.g. required for correct functioning,...
Definition interface.h:192
The global namespace for the Goby project.
STL namespace.