Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
interthread.h
Go to the documentation of this file.
1// Copyright 2016-2021:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERTHREAD_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERTHREAD_H
26
27#include <functional> // for fun...
28#include <memory> // for sha...
29#include <mutex> // for mutex
30#include <thread> // for get_id
31
32#include "goby/exception.h" // for Exc...
33#include "goby/middleware/group.h" // for Group
34#include "goby/middleware/marshalling/interface.h" // for Mar...
36#include "goby/middleware/transport/interface.h" // for Sta...
37#include "goby/middleware/transport/null.h" // for Nul...
38#include "goby/middleware/transport/poller.h" // for Poller
39#include "goby/middleware/transport/publisher.h" // for Pub...
40#include "goby/middleware/transport/subscriber.h" // for Sub...
41
42namespace goby
43{
44namespace middleware
45{
57 : public StaticTransporterInterface<InterThreadTransporter, NullTransporter>,
58 public Poller<InterThreadTransporter>
59{
60 private:
61 struct EmptyMessage
62 {
63 };
64
65 public:
66 InterThreadTransporter() : data_mutex_(std::make_shared<std::mutex>()) {}
67
69 {
70 detail::SubscriptionStoreBase::unsubscribe_all(std::this_thread::get_id());
71 detail::SubscriptionStoreBase::remove(std::this_thread::get_id());
72 }
73
75 template <typename Data> static constexpr int scheme() { return MarshallingScheme::CXX_OBJECT; }
76
78 template <const Group& group> void check_validity()
79 {
80 static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
81 "goby::middleware::Group must have non-zero length string to publish on the "
82 "InterThread layer");
83 }
84
87 {
88 if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
89 throw(goby::Exception("Group must have a non-empty string for use on InterThread"));
90 }
91
99 template <typename Data, int scheme = scheme<Data>()>
100 void publish_dynamic(const Data& data, const Group& group,
101 const Publisher<Data>& publisher = Publisher<Data>())
102 {
104 std::shared_ptr<Data> data_ptr(new Data(data));
105 publish_dynamic<Data>(data_ptr, group, publisher);
106 }
107
115 template <typename Data, int scheme = scheme<Data>()>
116 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
117 const Publisher<Data>& publisher = Publisher<Data>())
118 {
121 }
122
130 template <typename Data, int scheme = scheme<Data>()>
131 void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
132 const Publisher<Data>& publisher = Publisher<Data>())
133 {
134 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
135 }
136
138 template <const Group& group> void publish_empty()
139 {
140 publish_dynamic<EmptyMessage>(
141 std::shared_ptr<EmptyMessage>(std::make_shared<EmptyMessage>()), group);
142 }
143
150 template <typename Data, int scheme = scheme<Data>()>
151 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
152 const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
153 {
155 detail::SubscriptionStore<Data>::subscribe([=](std::shared_ptr<const Data> pd) { f(*pd); },
156 group, std::this_thread::get_id(), data_mutex_,
159 }
160
167 template <typename Data, int scheme = scheme<Data>()>
168 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
169 const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
170 {
173 f, group, std::this_thread::get_id(), data_mutex_, Poller<InterThreadTransporter>::cv(),
175 }
176
178 template <const Group& group> void subscribe_empty(const std::function<void()>& f)
179 {
180 subscribe_dynamic<EmptyMessage>([=](const std::shared_ptr<const EmptyMessage>&) { f(); },
181 group);
182 }
183
189 template <typename Data, int scheme = scheme<Data>()>
191 const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
192 {
194 detail::SubscriptionStore<Data>::unsubscribe(group, std::this_thread::get_id());
195 }
196
199 {
200 detail::SubscriptionStoreBase::unsubscribe_all(std::this_thread::get_id());
201 }
202
203 private:
205 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
206 {
207 return detail::SubscriptionStoreBase::poll_all(std::this_thread::get_id(), lock);
208 }
209
210 private:
211 // protects this thread's DataQueue
212 std::shared_ptr<std::mutex> data_mutex_;
213};
214
215} // namespace middleware
216} // namespace goby
217
218#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
A transporter for the interthread layer.
Definition interthread.h:59
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void check_validity()
Check validity of the Group for interthread use (at compile time)
Definition interthread.h:78
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
Definition interthread.h:86
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void subscribe_empty(const std::function< void()> &f)
Subscribe with no data (used to receive a signal from another thread)
void unsubscribe_all()
Unsubscribe from all current subscriptions.
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_empty()
Publish with no data (used to signal another thread)
static constexpr int scheme()
Scheme for interthread is always MarshallingScheme::CXX_OBJECT as the data are not serialized,...
Definition interthread.h:75
std::shared_ptr< std::condition_variable_any > cv()
access the condition variable used for poll synchronization
Definition interface.h:166
std::shared_ptr< std::timed_mutex > poll_mutex()
access the mutex used for poll synchronization
Definition interface.h:160
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:204
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
static void remove(std::thread::id thread_id)
static int poll_all(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::timed_mutex > > &lock)
static void unsubscribe_all(std::thread::id thread_id)
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable_any > cv, std::shared_ptr< std::timed_mutex > poller_mutex)
static void unsubscribe(const Group &group, std::thread::id thread_id)
static void publish(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher)
goby::util::logger::GroupSetter group(std::string n)
The global namespace for the Goby project.
STL namespace.