Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
subscription_store.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
27
28#include <condition_variable>
29#include <functional>
30#include <memory>
31#include <mutex>
32#include <set>
33#include <shared_mutex>
34#include <thread>
35#include <typeindex>
36#include <unordered_map>
37#include <vector>
38
40
41namespace goby
42{
43namespace middleware
44{
45namespace detail
46{
49{
50 private:
51 // for each thread, stores a map of Datas to SubscriptionStores so that can call poll() on all the stores
52 using StoresMap = std::unordered_map<std::type_index, std::shared_ptr<SubscriptionStoreBase>>;
53 static std::unordered_map<std::thread::id, StoresMap> stores_;
54 static std::shared_timed_mutex stores_mutex_;
55
56 public:
58 virtual ~SubscriptionStoreBase() = default;
59
60 // returns number of data items posted to callbacks
61 static int poll_all(std::thread::id thread_id,
62 std::unique_ptr<std::unique_lock<std::mutex>>& lock)
63 {
64 // make a copy so that other threads can subscribe if
65 // necessary in their callbacks
66 StoresMap stores;
67 {
68 std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
69 if (stores_.count(thread_id))
70 stores = stores_.at(thread_id);
71 }
72
73 int poll_items = 0;
74 for (auto const& s : stores) poll_items += s.second->poll(thread_id, lock);
75 return poll_items;
76 }
77
78 static void unsubscribe_all(std::thread::id thread_id)
79 {
80 std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
81 if (stores_.count(thread_id))
82 {
83 for (auto const& s : stores_.at(thread_id)) s.second->unsubscribe_all_groups(thread_id);
84 }
85 }
86
87 static void remove(std::thread::id thread_id)
88 {
89 std::lock_guard<decltype(stores_mutex_)> lock(stores_mutex_);
90 stores_.erase(thread_id);
91 }
92
93 protected:
94 template <typename StoreType> static void insert(std::thread::id thread_id)
95 {
96 // check the store, and if there isn't one for this type, create one
97 std::lock_guard<decltype(stores_mutex_)> lock(stores_mutex_);
98
99 if (!stores_.count(thread_id))
100 stores_.insert(std::make_pair(thread_id, StoresMap()));
101
102 auto index = std::type_index(typeid(StoreType));
103 if (!stores_.at(thread_id).count(index))
104 stores_.at(thread_id).insert(
105 std::make_pair(index, std::shared_ptr<StoreType>(new StoreType)));
106 }
107
108 protected:
109 virtual int poll(std::thread::id thread_id,
110 std::unique_ptr<std::unique_lock<std::mutex>>& lock) = 0;
111 virtual void unsubscribe_all_groups(std::thread::id thread_id) = 0;
112};
113
115{
116 DataProtection(std::shared_ptr<std::mutex> dm, std::shared_ptr<std::condition_variable> pcv,
117 std::shared_ptr<std::mutex> pm)
118 : data_mutex(dm), poller_cv(pcv), poller_mutex(pm)
119 {
120 }
121
122 std::shared_ptr<std::mutex> data_mutex;
123 std::shared_ptr<std::condition_variable> poller_cv;
124 std::shared_ptr<std::mutex> poller_mutex;
125};
126
128template <typename Data> class SubscriptionStore : public SubscriptionStoreBase
129{
130 public:
131 static void subscribe(std::function<void(std::shared_ptr<const Data>)> func, const Group& group,
132 std::thread::id thread_id, std::shared_ptr<std::mutex> data_mutex,
133 std::shared_ptr<std::condition_variable> cv,
134 std::shared_ptr<std::mutex> poller_mutex)
135 {
136 {
137 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
138
139 // insert callback
140 auto it =
141 subscription_callbacks_.insert(std::make_pair(thread_id, Callback(group, func)));
142 // insert group with iterator to callback
143 subscription_groups_.insert(std::make_pair(group, it));
144
145 // if necessary, create a DataQueue for this thread
146 auto queue_it = data_.find(thread_id);
147 if (queue_it == data_.end())
148 {
149 auto bool_it_pair = data_.insert(std::make_pair(thread_id, DataQueue()));
150 queue_it = bool_it_pair.first;
151 }
152 queue_it->second.create(group);
153
154 // if we don't have a condition variable already for this thread, store it
155 if (!data_protection_.count(thread_id))
156 data_protection_.insert(std::make_pair(
157 thread_id, detail::DataProtection(data_mutex, cv, poller_mutex)));
158 }
159
160 // try inserting a copy of this templated class via the base class for SubscriptionStoreBase::poll_all to use
161 SubscriptionStoreBase::insert<SubscriptionStore<Data>>(thread_id);
162 }
163
164 static void unsubscribe(const Group& group, std::thread::id thread_id)
165 {
166 {
167 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
168
169 // iterate over subscriptions for this group, and erase the ones belonging to this thread_id
170 auto range = subscription_groups_.equal_range(group);
171 for (auto it = range.first; it != range.second;)
172 {
173 auto sub_thread_id = it->second->first;
174
175 if (sub_thread_id == thread_id)
176 {
177 subscription_callbacks_.erase(it->second);
178 it = subscription_groups_.erase(it);
179 }
180 else
181 {
182 ++it;
183 }
184 }
185
186 // remove the dataqueue for this group
187 auto queue_it = data_.find(thread_id);
188 queue_it->second.remove(group);
189 }
190 }
191
192 static void publish(std::shared_ptr<const Data> data, const Group& group,
193 const Publisher<Data>& publisher)
194 {
195 // push new data
196 // build up local vector of relevant condition variables while locked
197 std::vector<detail::DataProtection> cv_to_notify;
198 {
199 std::shared_lock<std::shared_timed_mutex> lock(subscription_mutex_);
200
201 auto range = subscription_groups_.equal_range(group);
202 for (auto it = range.first; it != range.second; ++it)
203 {
204 std::thread::id thread_id = it->second->first;
205
206 // don't store a copy if publisher == subscriber, and echo is false
207 if (thread_id != std::this_thread::get_id() || publisher.cfg().echo())
208 {
209 // protect the DataQueue we are writing to
210 std::unique_lock<std::mutex> lock(*(data_protection_.at(thread_id).data_mutex));
211 auto queue_it = data_.find(thread_id);
212 queue_it->second.insert(group, data);
213 cv_to_notify.push_back(data_protection_.at(thread_id));
214 }
215 }
216 }
217
218 // unlock and notify condition variables from local vector
219 for (const auto& data_protection : cv_to_notify)
220 {
221 {
222 // lock to ensure the other thread isn't in the limbo region
223 // between _poll_all() and wait(), where the condition variable
224 // signal would be lost
225
226 std::lock_guard<std::mutex> l(*data_protection.poller_mutex);
227 }
228 data_protection.poller_cv->notify_all();
229 }
230 }
231
232 private:
233 int poll(std::thread::id thread_id,
234 std::unique_ptr<std::unique_lock<std::mutex>>& lock) override
235 {
236 std::vector<std::pair<std::shared_ptr<typename Callback::CallbackType>,
237 std::shared_ptr<const Data>>>
238 data_callbacks;
239 int poll_items_count = 0;
240
241 {
242 std::shared_lock<std::shared_timed_mutex> sub_lock(subscription_mutex_);
243
244 auto queue_it = data_.find(thread_id);
245 if (queue_it == data_.end())
246 return 0; // no subscriptions
247
248 std::unique_lock<std::mutex> data_lock(
249 *(data_protection_.find(thread_id)->second.data_mutex));
250
251 // loop over all Groups stored in this DataQueue
252 for (auto data_it = queue_it->second.cbegin(), end = queue_it->second.cend();
253 data_it != end; ++data_it)
254 {
255 const Group& group = data_it->first;
256 auto group_range = subscription_groups_.equal_range(group);
257 // For a given Group, loop over all subscriptions to this Group
258 for (auto group_it = group_range.first; group_it != group_range.second; ++group_it)
259 {
260 if (group_it->second->first != thread_id)
261 continue;
262
263 // store the callback function and datum for all the elements queued
264 for (auto& datum : data_it->second)
265 {
266 ++poll_items_count;
267 // we have data, no need to keep this lock any longer
268 if (lock)
269 lock.reset();
270 data_callbacks.push_back(
271 std::make_pair(group_it->second->second.callback, datum));
272 }
273 }
274 queue_it->second.clear(group);
275 }
276 }
277
278 // now that we're no longer blocking the subscription or data mutex, actually run the callbacks
279 for (const auto& callback_datum_pair : data_callbacks)
280 (*callback_datum_pair.first)(std::move(callback_datum_pair.second));
281
282 return poll_items_count;
283 }
284
285 void unsubscribe_all_groups(std::thread::id thread_id) override
286 {
287 {
288 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
289
290 for (auto it = subscription_groups_.begin(); it != subscription_groups_.end();)
291 {
292 auto sub_thread_id = it->second->first;
293
294 if (sub_thread_id == thread_id)
295 {
296 subscription_callbacks_.erase(it->second);
297 it = subscription_groups_.erase(it);
298 }
299 else
300 {
301 ++it;
302 }
303 }
304
305 data_.erase(thread_id);
306 data_protection_.erase(thread_id);
307 }
308 }
309
310 private:
311 struct Callback
312 {
313 using CallbackType = std::function<void(std::shared_ptr<const Data>)>;
314 Callback(const Group& g, const std::function<void(std::shared_ptr<const Data>)>& c)
315 : group(g), callback(new CallbackType(c))
316 {
317 }
318 Group group;
319 std::shared_ptr<CallbackType> callback;
320 };
321
322 class DataQueue
323 {
324 private:
325 std::unordered_map<Group, std::vector<std::shared_ptr<const Data>>> data_;
326
327 public:
328 void create(const Group& g)
329 {
330 auto it = data_.find(g);
331 if (it == data_.end())
332 data_.insert(std::make_pair(g, std::vector<std::shared_ptr<const Data>>()));
333 }
334 void remove(const Group& g) { data_.erase(g); }
335
336 void insert(const Group& g, std::shared_ptr<const Data> datum)
337 {
338 data_.find(g)->second.push_back(datum);
339 }
340 void clear(const Group& g) { data_.find(g)->second.clear(); }
341 bool empty() { return data_.empty(); }
342 typename decltype(data_)::const_iterator cbegin() { return data_.begin(); }
343 typename decltype(data_)::const_iterator cend() { return data_.end(); }
344 };
345
346 // subscriptions for a given thread
347 static std::unordered_multimap<std::thread::id, Callback> subscription_callbacks_;
348 // threads that are subscribed to a given group
349 static std::unordered_multimap<Group,
350 typename decltype(subscription_callbacks_)::const_iterator>
351 subscription_groups_;
352 // condition variable to use for data
353 static std::unordered_map<std::thread::id, detail::DataProtection> data_protection_;
354
355 static std::shared_timed_mutex
356 subscription_mutex_; // protects subscription_callbacks, subscription_groups, data_protection, and the overarching data_ map (but not the DataQueues within it, which are protected by the mutexes stored in data_protection_))
357
358 // data for a given thread
359 static std::unordered_map<std::thread::id, DataQueue> data_;
360};
361
362template <typename Data>
363std::unordered_multimap<std::thread::id, typename SubscriptionStore<Data>::Callback>
364 SubscriptionStore<Data>::subscription_callbacks_;
365template <typename Data>
366std::unordered_map<std::thread::id, typename SubscriptionStore<Data>::DataQueue>
367 SubscriptionStore<Data>::data_;
368template <typename Data>
369std::unordered_multimap<goby::middleware::Group,
370 typename decltype(
371 SubscriptionStore<Data>::subscription_callbacks_)::const_iterator>
372 SubscriptionStore<Data>::subscription_groups_;
373template <typename Data>
374std::unordered_map<std::thread::id, detail::DataProtection>
375 SubscriptionStore<Data>::data_protection_;
376
377template <typename Data> std::shared_timed_mutex SubscriptionStore<Data>::subscription_mutex_;
378
379} // namespace detail
380} // namespace middleware
381} // namespace goby
382
383#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Base class for interthread subscription information. Non-template so it can be stored in a single con...
static void remove(std::thread::id thread_id)
virtual int poll(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::mutex > > &lock)=0
virtual void unsubscribe_all_groups(std::thread::id thread_id)=0
static void insert(std::thread::id thread_id)
static void unsubscribe_all(std::thread::id thread_id)
static int poll_all(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::mutex > > &lock)
Storage class for a specific interthread subscription (and related data). Used by InterThreadTranspor...
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable > cv, std::shared_ptr< std::mutex > poller_mutex)
static void unsubscribe(const Group &group, std::thread::id thread_id)
static void publish(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher)
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
Definition json.hpp:263
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:53
The global namespace for the Goby project.
builder< Clock, json_traits > create(Clock c)
Definition jwt.h:4157
STL namespace.
DataProtection(std::shared_ptr< std::mutex > dm, std::shared_ptr< std::condition_variable > pcv, std::shared_ptr< std::mutex > pm)
std::shared_ptr< std::mutex > data_mutex
std::shared_ptr< std::mutex > poller_mutex
std::shared_ptr< std::condition_variable > poller_cv