Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
subscription_store.h
Go to the documentation of this file.
1// Copyright 2016-2025:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
26
27#include <condition_variable>
28#include <functional>
29#include <memory>
30#include <mutex>
31#include <set>
32#include <shared_mutex>
33#include <thread>
34#include <typeindex>
35#include <unordered_map>
36#include <vector>
37
39
40namespace goby
41{
42namespace middleware
43{
44namespace detail
45{
48{
49 private:
50 // for each thread, stores a map of Datas to SubscriptionStores so that can call poll() on all the stores
51 using StoresMap = std::unordered_map<std::type_index, std::shared_ptr<SubscriptionStoreBase>>;
52 static std::unordered_map<std::thread::id, StoresMap> stores_;
53 static std::shared_timed_mutex stores_mutex_;
54
55 public:
57 virtual ~SubscriptionStoreBase() = default;
58
59 // returns number of data items posted to callbacks
60 static int poll_all(std::thread::id thread_id,
61 std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
62 {
63 // make a copy so that other threads can subscribe if
64 // necessary in their callbacks
65 StoresMap stores;
66 {
67 std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
68 if (stores_.count(thread_id))
69 stores = stores_.at(thread_id);
70 }
71
72 int poll_items = 0;
73 for (auto const& s : stores) poll_items += s.second->poll(thread_id, lock);
74 return poll_items;
75 }
76
77 static void unsubscribe_all(std::thread::id thread_id)
78 {
79 std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
80 if (stores_.count(thread_id))
81 {
82 for (auto const& s : stores_.at(thread_id)) s.second->unsubscribe_all_groups(thread_id);
83 }
84 }
85
86 static void remove(std::thread::id thread_id)
87 {
88 std::lock_guard<decltype(stores_mutex_)> lock(stores_mutex_);
89 stores_.erase(thread_id);
90 }
91
92 protected:
93 template <typename StoreType> static void insert(std::thread::id thread_id)
94 {
95 // check the store, and if there isn't one for this type, create one
96 std::lock_guard<decltype(stores_mutex_)> lock(stores_mutex_);
97
98 if (!stores_.count(thread_id))
99 stores_.insert(std::make_pair(thread_id, StoresMap()));
100
101 auto index = std::type_index(typeid(StoreType));
102 if (!stores_.at(thread_id).count(index))
103 stores_.at(thread_id).insert(
104 std::make_pair(index, std::shared_ptr<StoreType>(new StoreType)));
105 }
106
107 protected:
108 virtual int poll(std::thread::id thread_id,
109 std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) = 0;
110 virtual void unsubscribe_all_groups(std::thread::id thread_id) = 0;
111};
112
114{
115 DataProtection(std::shared_ptr<std::mutex> dm, std::shared_ptr<std::condition_variable_any> pcv,
116 std::shared_ptr<std::timed_mutex> pm)
117 : data_mutex(dm), poller_cv(pcv), poller_mutex(pm)
118 {
119 }
120
121 std::shared_ptr<std::mutex> data_mutex;
122 std::shared_ptr<std::condition_variable_any> poller_cv;
123 std::shared_ptr<std::timed_mutex> poller_mutex;
124};
125
127template <typename Data> class SubscriptionStore : public SubscriptionStoreBase
128{
129 public:
130 static void subscribe(std::function<void(std::shared_ptr<const Data>)> func, const Group& group,
131 std::thread::id thread_id, std::shared_ptr<std::mutex> data_mutex,
132 std::shared_ptr<std::condition_variable_any> cv,
133 std::shared_ptr<std::timed_mutex> poller_mutex)
134 {
135 {
136 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
137
138 // insert callback
139 auto it =
140 subscription_callbacks_.insert(std::make_pair(thread_id, Callback(group, func)));
141 // insert group with iterator to callback
142 subscription_groups_.insert(std::make_pair(group, it));
143
144 // if necessary, create a DataQueue for this thread
145 auto queue_it = data_.find(thread_id);
146 if (queue_it == data_.end())
147 {
148 auto bool_it_pair = data_.insert(std::make_pair(thread_id, DataQueue()));
149 queue_it = bool_it_pair.first;
150 }
151 queue_it->second.create(group);
152
153 // if we don't have a condition variable already for this thread, store it
154 if (!data_protection_.count(thread_id))
155 data_protection_.insert(std::make_pair(
156 thread_id, detail::DataProtection(data_mutex, cv, poller_mutex)));
157 }
158
159 // try inserting a copy of this templated class via the base class for SubscriptionStoreBase::poll_all to use
160 SubscriptionStoreBase::insert<SubscriptionStore<Data>>(thread_id);
161 }
162
163 static void unsubscribe(const Group& group, std::thread::id thread_id)
164 {
165 {
166 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
167
168 // iterate over subscriptions for this group, and erase the ones belonging to this thread_id
169 auto range = subscription_groups_.equal_range(group);
170 for (auto it = range.first; it != range.second;)
171 {
172 auto sub_thread_id = it->second->first;
173
174 if (sub_thread_id == thread_id)
175 {
176 subscription_callbacks_.erase(it->second);
177 it = subscription_groups_.erase(it);
178 }
179 else
180 {
181 ++it;
182 }
183 }
184
185 // remove the dataqueue for this group
186 auto queue_it = data_.find(thread_id);
187 queue_it->second.remove(group);
188 }
189 }
190
191 static void publish(std::shared_ptr<const Data> data, const Group& group,
192 const Publisher<Data>& publisher)
193 {
194 // push new data
195 // build up local vector of relevant condition variables while locked
196 std::vector<detail::DataProtection> cv_to_notify;
197 {
198 std::shared_lock<std::shared_timed_mutex> lock(subscription_mutex_);
199
200 auto range = subscription_groups_.equal_range(group);
201 for (auto it = range.first; it != range.second; ++it)
202 {
203 std::thread::id thread_id = it->second->first;
204
205 // don't store a copy if publisher == subscriber, and echo is false
206 if (thread_id != std::this_thread::get_id() || publisher.cfg().echo())
207 {
208 // protect the DataQueue we are writing to
209 std::unique_lock<std::mutex> lock(*(data_protection_.at(thread_id).data_mutex));
210 auto queue_it = data_.find(thread_id);
211 queue_it->second.insert(group, data);
212 cv_to_notify.push_back(data_protection_.at(thread_id));
213 }
214 }
215 }
216
217 // unlock and notify condition variables from local vector
218 for (const auto& data_protection : cv_to_notify)
219 {
220 {
221 // lock to ensure the other thread isn't in the limbo region
222 // between _poll_all() and wait(), where the condition variable
223 // signal would be lost
224
225 std::lock_guard<std::timed_mutex> l(*data_protection.poller_mutex);
226 }
227 data_protection.poller_cv->notify_all();
228 }
229 }
230
231 private:
232 int poll(std::thread::id thread_id,
233 std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) override
234 {
235 std::vector<std::pair<std::shared_ptr<typename Callback::CallbackType>,
236 std::shared_ptr<const Data>>>
237 data_callbacks;
238 int poll_items_count = 0;
239
240 {
241 std::shared_lock<std::shared_timed_mutex> sub_lock(subscription_mutex_);
242
243 auto queue_it = data_.find(thread_id);
244 if (queue_it == data_.end())
245 return 0; // no subscriptions
246
247 std::unique_lock<std::mutex> data_lock(
248 *(data_protection_.find(thread_id)->second.data_mutex));
249
250 // loop over all Groups stored in this DataQueue
251 for (auto data_it = queue_it->second.cbegin(), end = queue_it->second.cend();
252 data_it != end; ++data_it)
253 {
254 const Group& group = data_it->first;
255 auto group_range = subscription_groups_.equal_range(group);
256 // For a given Group, loop over all subscriptions to this Group
257 for (auto group_it = group_range.first; group_it != group_range.second; ++group_it)
258 {
259 if (group_it->second->first != thread_id)
260 continue;
261
262 // store the callback function and datum for all the elements queued
263 for (auto& datum : data_it->second)
264 {
265 ++poll_items_count;
266 // we have data, no need to keep this lock any longer
267 if (lock)
268 lock.reset();
269 data_callbacks.push_back(
270 std::make_pair(group_it->second->second.callback, datum));
271 }
272 }
273 queue_it->second.clear(group);
274 }
275 }
276
277 // now that we're no longer blocking the subscription or data mutex, actually run the callbacks
278 for (const auto& callback_datum_pair : data_callbacks)
279 (*callback_datum_pair.first)(std::move(callback_datum_pair.second));
280
281 return poll_items_count;
282 }
283
284 void unsubscribe_all_groups(std::thread::id thread_id) override
285 {
286 {
287 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
288
289 for (auto it = subscription_groups_.begin(); it != subscription_groups_.end();)
290 {
291 auto sub_thread_id = it->second->first;
292
293 if (sub_thread_id == thread_id)
294 {
295 subscription_callbacks_.erase(it->second);
296 it = subscription_groups_.erase(it);
297 }
298 else
299 {
300 ++it;
301 }
302 }
303
304 data_.erase(thread_id);
305 data_protection_.erase(thread_id);
306 }
307 }
308
309 private:
310 struct Callback
311 {
312 using CallbackType = std::function<void(std::shared_ptr<const Data>)>;
313 Callback(const Group& g, const std::function<void(std::shared_ptr<const Data>)>& c)
314 : group(g), callback(new CallbackType(c))
315 {
316 }
317 Group group;
318 std::shared_ptr<CallbackType> callback;
319 };
320
321 class DataQueue
322 {
323 private:
324 std::unordered_map<Group, std::vector<std::shared_ptr<const Data>>> data_;
325
326 public:
327 void create(const Group& g)
328 {
329 auto it = data_.find(g);
330 if (it == data_.end())
331 data_.insert(std::make_pair(g, std::vector<std::shared_ptr<const Data>>()));
332 }
333 void remove(const Group& g) { data_.erase(g); }
334
335 void insert(const Group& g, std::shared_ptr<const Data> datum)
336 {
337 data_.find(g)->second.push_back(datum);
338 }
339 void clear(const Group& g) { data_.find(g)->second.clear(); }
340 bool empty() { return data_.empty(); }
341 typename decltype(data_)::const_iterator cbegin() { return data_.begin(); }
342 typename decltype(data_)::const_iterator cend() { return data_.end(); }
343 };
344
345 // subscriptions for a given thread
346 static std::unordered_multimap<std::thread::id, Callback> subscription_callbacks_;
347 // threads that are subscribed to a given group
348 static std::unordered_multimap<Group,
349 typename decltype(subscription_callbacks_)::const_iterator>
350 subscription_groups_;
351 // condition variable to use for data
352 static std::unordered_map<std::thread::id, detail::DataProtection> data_protection_;
353
354 static std::shared_timed_mutex
355 subscription_mutex_; // protects subscription_callbacks, subscription_groups, data_protection, and the overarching data_ map (but not the DataQueues within it, which are protected by the mutexes stored in data_protection_))
356
357 // data for a given thread
358 static std::unordered_map<std::thread::id, DataQueue> data_;
359};
360
361template <typename Data>
362std::unordered_multimap<std::thread::id, typename SubscriptionStore<Data>::Callback>
363 SubscriptionStore<Data>::subscription_callbacks_;
364template <typename Data>
365std::unordered_map<std::thread::id, typename SubscriptionStore<Data>::DataQueue>
366 SubscriptionStore<Data>::data_;
367template <typename Data>
368std::unordered_multimap<goby::middleware::Group,
369 typename decltype(
370 SubscriptionStore<Data>::subscription_callbacks_)::const_iterator>
371 SubscriptionStore<Data>::subscription_groups_;
372template <typename Data>
373std::unordered_map<std::thread::id, detail::DataProtection>
374 SubscriptionStore<Data>::data_protection_;
375
376template <typename Data> std::shared_timed_mutex SubscriptionStore<Data>::subscription_mutex_;
377
378} // namespace detail
379} // namespace middleware
380} // namespace goby
381
382#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Base class for interthread subscription information. Non-template so it can be stored in a single con...
static void remove(std::thread::id thread_id)
virtual void unsubscribe_all_groups(std::thread::id thread_id)=0
static void insert(std::thread::id thread_id)
static int poll_all(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::timed_mutex > > &lock)
static void unsubscribe_all(std::thread::id thread_id)
virtual int poll(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::timed_mutex > > &lock)=0
Storage class for a specific interthread subscription (and related data). Used by InterThreadTranspor...
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable_any > cv, std::shared_ptr< std::timed_mutex > poller_mutex)
static void unsubscribe(const Group &group, std::thread::id thread_id)
static void publish(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher)
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
Definition json.hpp:247
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:53
The global namespace for the Goby project.
builder< json_traits > create()
Definition jwt.h:4168
STL namespace.
std::shared_ptr< std::timed_mutex > poller_mutex
std::shared_ptr< std::mutex > data_mutex
std::shared_ptr< std::condition_variable_any > poller_cv
DataProtection(std::shared_ptr< std::mutex > dm, std::shared_ptr< std::condition_variable_any > pcv, std::shared_ptr< std::timed_mutex > pm)