132 std::thread::id
thread_id, std::shared_ptr<std::mutex> data_mutex,
133 std::shared_ptr<std::condition_variable> cv,
134 std::shared_ptr<std::mutex> poller_mutex)
137 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
141 subscription_callbacks_.insert(std::make_pair(
thread_id, Callback(
group, func)));
143 subscription_groups_.insert(std::make_pair(
group, it));
147 if (queue_it == data_.end())
149 auto bool_it_pair = data_.insert(std::make_pair(
thread_id, DataQueue()));
150 queue_it = bool_it_pair.first;
152 queue_it->second.create(
group);
156 data_protection_.insert(std::make_pair(
161 SubscriptionStoreBase::insert<SubscriptionStore<Data>>(
thread_id);
167 std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
170 auto range = subscription_groups_.equal_range(
group);
171 for (
auto it = range.first; it != range.second;)
173 auto sub_thread_id = it->second->first;
177 subscription_callbacks_.erase(it->second);
178 it = subscription_groups_.erase(it);
188 queue_it->second.remove(
group);
197 std::vector<detail::DataProtection> cv_to_notify;
199 std::shared_lock<std::shared_timed_mutex> lock(subscription_mutex_);
201 auto range = subscription_groups_.equal_range(
group);
202 for (
auto it = range.first; it != range.second; ++it)
204 std::thread::id
thread_id = it->second->first;
210 std::unique_lock<std::mutex> lock(*(data_protection_.at(
thread_id).data_mutex));
212 queue_it->second.insert(
group, data);
213 cv_to_notify.push_back(data_protection_.at(
thread_id));
219 for (
const auto& data_protection : cv_to_notify)
226 std::lock_guard<std::mutex> l(*data_protection.poller_mutex);
228 data_protection.poller_cv->notify_all();
234 std::unique_ptr<std::unique_lock<std::mutex>>& lock)
override
236 std::vector<std::pair<std::shared_ptr<typename Callback::CallbackType>,
237 std::shared_ptr<const Data>>>
239 int poll_items_count = 0;
242 std::shared_lock<std::shared_timed_mutex> sub_lock(subscription_mutex_);
245 if (queue_it == data_.end())
248 std::unique_lock<std::mutex> data_lock(
249 *(data_protection_.find(
thread_id)->second.data_mutex));
252 for (
auto data_it = queue_it->second.cbegin(), end = queue_it->second.cend();
253 data_it != end; ++data_it)
256 auto group_range = subscription_groups_.equal_range(
group);
258 for (
auto group_it = group_range.first; group_it != group_range.second; ++group_it)
260 if (group_it->second->first !=
thread_id)
264 for (
auto& datum : data_it->second)
270 data_callbacks.push_back(
271 std::make_pair(group_it->second->second.callback, datum));
274 queue_it->second.clear(
group);
279 for (
const auto& callback_datum_pair : data_callbacks)
280 (*callback_datum_pair.first)(
std::move(callback_datum_pair.second));
282 return poll_items_count;
285 void unsubscribe_all_groups(std::thread::id
thread_id)
override
288 std::lock_guard<std::shared_timed_mutex>
lock(subscription_mutex_);
290 for (
auto it = subscription_groups_.begin(); it != subscription_groups_.end();)
292 auto sub_thread_id = it->second->first;
296 subscription_callbacks_.erase(it->second);
297 it = subscription_groups_.erase(it);
313 using CallbackType = std::function<void(std::shared_ptr<const Data>)>;
314 Callback(
const Group& g,
const std::function<
void(std::shared_ptr<const Data>)>& c)
315 :
group(g), callback(new CallbackType(c))
319 std::shared_ptr<CallbackType> callback;
325 std::unordered_map<Group, std::vector<std::shared_ptr<const Data>>> data_;
328 void create(
const Group& g)
330 auto it = data_.find(g);
331 if (it == data_.end())
332 data_.insert(std::make_pair(g, std::vector<std::shared_ptr<const Data>>()));
334 void remove(
const Group& g) { data_.erase(g); }
336 void insert(
const Group& g, std::shared_ptr<const Data> datum)
338 data_.find(g)->second.push_back(datum);
340 void clear(
const Group& g) { data_.find(g)->second.clear(); }
341 bool empty() {
return data_.empty(); }
342 typename decltype(data_)::const_iterator cbegin() {
return data_.begin(); }
343 typename decltype(data_)::const_iterator cend() {
return data_.end(); }
347 static std::unordered_multimap<std::thread::id, Callback> subscription_callbacks_;
349 static std::unordered_multimap<Group,
350 typename decltype(subscription_callbacks_)::const_iterator>
351 subscription_groups_;
353 static std::unordered_map<std::thread::id, detail::DataProtection> data_protection_;
355 static std::shared_timed_mutex
359 static std::unordered_map<std::thread::id, DataQueue> data_;
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable > cv, std::shared_ptr< std::mutex > poller_mutex)