Goby3 3.5.0
2026.05.29
Loading...
Searching...
No Matches
interthread.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERTHREAD_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERTHREAD_H
27
28#include <functional> // for fun...
29#include <memory> // for sha...
30#include <mutex> // for mutex
31#include <thread> // for get_id
32
33#include "goby/exception.h" // for Exc...
34#include "goby/middleware/group.h" // for Group
35#include "goby/middleware/marshalling/interface.h" // for Mar...
37#include "goby/middleware/transport/interface.h" // for Sta...
38#include "goby/middleware/transport/null.h" // for Nul...
39#include "goby/middleware/transport/poller.h" // for Poller
40#include "goby/middleware/transport/publisher.h" // for Pub...
41#include "goby/middleware/transport/subscriber.h" // for Sub...
42
43namespace goby
44{
45namespace middleware
46{
58 : public StaticTransporterInterface<InterThreadTransporter, NullTransporter>,
59 public Poller<InterThreadTransporter>
60{
61 private:
62 struct EmptyMessage
63 {
64 };
65
66 public:
67 using implementation_tag = void;
68
69 InterThreadTransporter() : data_mutex_(std::make_shared<std::mutex>()) {}
70
72 {
73 detail::SubscriptionStoreBase::unsubscribe_all(std::this_thread::get_id());
74 detail::SubscriptionStoreBase::remove(std::this_thread::get_id());
75 }
76
78 template <typename Data> static constexpr int scheme() { return MarshallingScheme::CXX_OBJECT; }
79
81 template <const Group& group> void check_validity()
82 {
83 static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
84 "goby::middleware::Group must have non-zero length string to publish on the "
85 "InterThread layer");
86 }
87
90 {
91 if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
92 throw(goby::Exception("Group must have a non-empty string for use on InterThread"));
93 }
94
102 template <typename Data, int scheme = scheme<Data>()>
103 void publish_dynamic(const Data& data, const Group& group,
104 const Publisher<Data>& publisher = Publisher<Data>())
105 {
107 std::shared_ptr<Data> data_ptr(new Data(data));
108 publish_dynamic<Data>(data_ptr, group, publisher);
109 }
110
118 template <typename Data, int scheme = scheme<Data>()>
119 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
120 const Publisher<Data>& publisher = Publisher<Data>())
121 {
124 }
125
133 template <typename Data, int scheme = scheme<Data>()>
134 void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
135 const Publisher<Data>& publisher = Publisher<Data>())
136 {
137 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
138 }
139
141 template <const Group& group> void publish_empty()
142 {
143 publish_dynamic<EmptyMessage>(
144 std::shared_ptr<EmptyMessage>(std::make_shared<EmptyMessage>()), group);
145 }
146
153 template <typename Data, int scheme = scheme<Data>()>
154 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
155 const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
156 {
158 detail::SubscriptionStore<Data>::subscribe([=](std::shared_ptr<const Data> pd) { f(*pd); },
159 group, std::this_thread::get_id(), data_mutex_,
162 }
163
170 template <typename Data, int scheme = scheme<Data>()>
171 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
172 const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
173 {
176 f, group, std::this_thread::get_id(), data_mutex_, Poller<InterThreadTransporter>::cv(),
178 }
179
181 template <const Group& group> void subscribe_empty(const std::function<void()>& f)
182 {
183 subscribe_dynamic<EmptyMessage>([=](const std::shared_ptr<const EmptyMessage>&) { f(); },
184 group);
185 }
186
192 template <typename Data, int scheme = scheme<Data>()>
194 const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
195 {
197 detail::SubscriptionStore<Data>::unsubscribe(group, std::this_thread::get_id());
198 }
199
202 {
203 detail::SubscriptionStoreBase::unsubscribe_all(std::this_thread::get_id());
204 }
205
206 private:
208 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
209 {
210 return detail::SubscriptionStoreBase::poll_all(std::this_thread::get_id(), lock);
211 }
212
213 private:
214 // protects this thread's DataQueue
215 std::shared_ptr<std::mutex> data_mutex_;
216};
217
218} // namespace middleware
219} // namespace goby
220
221#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
A transporter for the interthread layer.
Definition interthread.h:60
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void check_validity()
Check validity of the Group for interthread use (at compile time)
Definition interthread.h:81
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
Definition interthread.h:89
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void subscribe_empty(const std::function< void()> &f)
Subscribe with no data (used to receive a signal from another thread)
void unsubscribe_all()
Unsubscribe from all current subscriptions.
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_empty()
Publish with no data (used to signal another thread)
static constexpr int scheme()
Scheme for interthread is always MarshallingScheme::CXX_OBJECT as the data are not serialized,...
Definition interthread.h:78
std::shared_ptr< std::mutex > poll_mutex()
access the mutex used for poll synchronization
Definition interface.h:162
std::shared_ptr< std::condition_variable > cv()
access the condition variable used for poll synchronization
Definition interface.h:168
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:39
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:235
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
static void remove(std::thread::id thread_id)
static void unsubscribe_all(std::thread::id thread_id)
static int poll_all(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::mutex > > &lock)
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable > cv, std::shared_ptr< std::mutex > poller_mutex)
static void unsubscribe(const Group &group, std::thread::id thread_id)
static void publish(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher)
goby::util::logger::GroupSetter group(std::string n)
The global namespace for the Goby project.
STL namespace.