Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7// Ryan Govostes <rgovostes+git@gmail.com>
8//
9//
10// This file is part of the Goby Underwater Autonomy Project Libraries
11// ("The Goby Libraries").
12//
13// The Goby Libraries are free software: you can redistribute them and/or modify
14// them under the terms of the GNU Lesser General Public License as published by
15// the Free Software Foundation, either version 2.1 of the License, or
16// (at your option) any later version.
17//
18// The Goby Libraries are distributed in the hope that they will be useful,
19// but WITHOUT ANY WARRANTY; without even the implied warranty of
20// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21// GNU Lesser General Public License for more details.
22//
23// You should have received a copy of the GNU Lesser General Public License
24// along with Goby. If not, see <http://www.gnu.org/licenses/>.
25
26#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
27#define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
28
29#include <atomic>
30#include <functional>
31#include <sys/types.h>
32#include <thread>
33#include <tuple>
34#include <unistd.h>
35
37
44
45namespace goby
46{
47namespace middleware
48{
49
55template <typename Derived, typename InnerTransporter, typename ImplementationTag>
58 InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>,
59 InnerTransporter>,
60 public Poller<InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>>
61{
64 InnerTransporter>;
65
66 using PollerType =
68
69 public:
71 using implementation_tag = ImplementationTag;
72
75 {
76 }
78
80
85 template <typename Data> static constexpr int scheme()
86 {
87 int scheme = goby::middleware::scheme<Data>();
88 // if default returns DCCL, use PROTOBUF instead
91 return scheme;
92 }
93
101 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
102 void publish_dynamic(const Data& data, const Group& group,
103 const Publisher<Data>& publisher = Publisher<Data>())
104 {
106 static_cast<Derived*>(this)->template _publish<Data, scheme>(data, group, publisher);
107 this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
108 }
109
117 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
118 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
119 const Publisher<Data>& publisher = Publisher<Data>())
120 {
121 if (data)
122 {
124 static_cast<Derived*>(this)->template _publish<Data, scheme>(*data, group, publisher);
125 this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
126 }
127 }
128
136 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
137 void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
138 const Publisher<Data>& publisher = Publisher<Data>())
139 {
140 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
141 }
142
144 void publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
146 {
148 static_cast<Derived*>(this)->_publish_serialized(type_name, scheme, bytes, group);
149 }
150
158 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
159 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
160 const Subscriber<Data>& subscriber = Subscriber<Data>())
161 {
163 static_cast<Derived*>(this)->template _subscribe<Data, scheme>(
164 [=](std::shared_ptr<const Data> d) { f(*d); }, group, subscriber);
165 }
166
174 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
175 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
176 const Subscriber<Data>& subscriber = Subscriber<Data>())
177 {
179 static_cast<Derived*>(this)->template _subscribe<Data, scheme>(f, group, subscriber);
180 }
181
187 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
189 const Subscriber<Data>& subscriber = Subscriber<Data>())
190 {
192 static_cast<Derived*>(this)->template _unsubscribe<Data, scheme>(group, subscriber);
193 }
194
196 void unsubscribe_all() { static_cast<Derived*>(this)->_unsubscribe_all(); }
197
205 std::shared_ptr<SerializationSubscriptionRegex>
206 subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
207 const std::string& type, const Group& group)>
208 f,
209 const std::set<int>& schemes, const std::string& type_regex = ".*",
210 const std::string& group_regex = ".*")
211 {
212 return static_cast<Derived*>(this)->_subscribe_regex(f, schemes, type_regex, group_regex);
213 }
214
224 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
225 std::shared_ptr<SerializationSubscriptionRegex> subscribe_type_regex(
226 std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
227 const Group& group, const std::string& type_regex = ".*")
228 {
229 std::regex special_chars{R"([-[\]{}()*+?.,\^$|#\s])"};
230 std::string sanitized_group =
231 std::regex_replace(std::string(group), special_chars, R"(\$&)");
232
233 auto regex_lambda = [f = std::move(f)](const std::vector<unsigned char>& data, int schm,
234 const std::string& type, const Group& grp)
235 {
236 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
237 auto msg =
238 SerializerParserHelper<Data, scheme>::parse(data_begin, data_end, actual_end, type);
239 f(msg, type);
240 };
241
242 return static_cast<Derived*>(this)->_subscribe_regex(regex_lambda, {scheme}, type_regex,
243 "^" + sanitized_group + "$");
244 }
245
254 template <const Group& group, typename Data,
255 int scheme = InterProcessTransporterBase::scheme<Data>()>
257 std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
258 const std::string& type_regex = ".*")
259 {
260 subscribe_type_regex(f, group, type_regex);
261 }
262
266 template <const Group& group> void check_validity()
267 {
268 static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
269 "goby::middleware::Group must have non-zero length string to publish on the "
270 "InterProcess layer");
271 }
272
275 {
276 if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
277 throw(goby::Exception("Group must have a non-empty string for use on InterProcess"));
278 }
279
280 protected:
281 inline static constexpr auto to_portal_group_name_ =
282 detail::concat(ImplementationTag::prefix, "::to_portal");
283 inline static constexpr auto regex_group_name_ =
284 detail::concat(ImplementationTag::prefix, "::regex");
285 inline static constexpr auto from_portal_group_name_ =
286 detail::concat(ImplementationTag::prefix, "::from_portal");
287
288 inline static constexpr Group to_portal_group_{to_portal_group_name_.data()};
289 inline static constexpr Group regex_group_{regex_group_name_.data()};
290 inline static constexpr Group from_portal_group_{from_portal_group_name_.data()};
291
292 private:
293 friend PollerType;
294 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
295 {
296 return static_cast<Derived*>(this)->_poll(lock);
297 }
298};
299
305template <typename InnerTransporter, typename ImplementationTag = void> class InterProcessForwarder;
306
311template <typename InnerTransporter, typename ImplementationTag>
313 : public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter, ImplementationTag>,
314 InnerTransporter, ImplementationTag>
315{
316 public:
317 using Base =
319 InnerTransporter, ImplementationTag>;
320
324 InterProcessForwarder(InnerTransporter& inner)
325 : Base(inner), alive_(std::make_shared<std::atomic<bool>>(true))
326 {
327 this->inner()
330 [this](
331 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
332 msg) { _receive_regex_data_forwarded(msg); });
333 }
335 {
336 // Mark as no longer alive so that forwarding lambdas (captured by the
337 // portal's subscription handlers) become no-ops. This avoids
338 // use-after-free when the portal invokes a stale callback after the
339 // forwarder has been destroyed.
340 *alive_ = false;
341
342 this->unsubscribe_all();
343
344 // TODO - remove by adding in an explicit handshake with the unsubscribe_all publication so that we don't delete ourself (and thus our inner()) before the Portal has deleted all the subscriptions
345 usleep(1e5);
346 }
347
348 friend Base;
349
350 private:
351 template <typename Data, int scheme>
352 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
353 {
354 // create and forward publication to edge
355 std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
356 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
357 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
358 auto* key = msg->mutable_key();
359
360 key->set_marshalling_scheme(scheme);
362 key->set_group(std::string(group));
363 msg->set_allocated_data(sbytes);
364
365 *key->mutable_cfg() = publisher.cfg();
366
367 this->inner().template publish<Base::to_portal_group_>(msg);
368 }
369
370 template <typename Data, int scheme>
371 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
372 const Subscriber<Data>& subscriber)
373 {
374 this->inner().template subscribe_dynamic<Data, scheme>(f, group);
375
376 // forward subscription to edge
377 // Capture alive_ by value (shared_ptr copy) so the lambda can detect
378 // when the forwarder has been destroyed and avoid use-after-free.
379 auto alive = alive_;
380 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d)
381 {
382 if (*alive)
383 this->inner().template publish_dynamic<Data, scheme>(d, group);
384 };
385
386 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
387 inner_publication_lambda, group,
389 [=](const Data& d) { return group; }));
390
391 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
392 subscription);
393 }
394
395 template <typename Data, int scheme>
396 void _unsubscribe(const Group& group, const Subscriber<Data>& subscriber)
397 {
398 this->inner().template unsubscribe_dynamic<Data, scheme>(group, subscriber);
399
400 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
401 new SerializationUnSubscription<Data, scheme>(group));
402
403 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
404 unsubscription);
405 }
406
407 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
409 {
410 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
411 auto* key = msg->mutable_key();
412
413 key->set_marshalling_scheme(scheme);
414 key->set_type(type_name);
415 key->set_group(std::string(group));
416 msg->set_data(std::string(bytes.begin(), bytes.end()));
417
418 this->inner().template publish<Base::to_portal_group_>(msg);
419 }
420
421 void _unsubscribe_all()
422 {
423 regex_subscriptions_.clear();
424 auto all = std::make_shared<SerializationUnSubscribeAll>();
425 this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
426 }
427
428 std::shared_ptr<SerializationSubscriptionRegex>
429 _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
430 const std::string& type, const Group& group)>
431 f,
432 const std::set<int>& schemes, const std::string& type_regex = ".*",
433 const std::string& group_regex = ".*")
434 {
435 auto alive = alive_;
436 auto inner_publication_lambda = [=](const std::vector<unsigned char>& data, int scheme,
437 const std::string& type, const Group& group)
438 {
439 if (!*alive)
440 return;
441 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
443 forwarded_data->mutable_key()->set_marshalling_scheme(scheme);
444 forwarded_data->mutable_key()->set_type(type);
445 forwarded_data->mutable_key()->set_group(group);
446 forwarded_data->set_data(std::string(data.begin(), data.end()));
447 this->inner().template publish<Base::regex_group_>(forwarded_data);
448 };
449
450 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
451 inner_publication_lambda, schemes, type_regex, group_regex);
452 this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
453 portal_subscription);
454
455 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
456 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
457 regex_subscriptions_.insert(local_subscription);
458 return local_subscription;
459 }
460
461 void _receive_regex_data_forwarded(
462 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage> msg)
463 {
464 const auto& bytes = msg->data();
465 for (auto& sub : regex_subscriptions_)
466 sub->post(bytes.begin(), bytes.end(), msg->key().marshalling_scheme(),
467 msg->key().type(), msg->key().group());
468 }
469
470 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
471 {
472 return 0;
473 } // A forwarder is a shell, only the inner Transporter has data
474
475 private:
476 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
477 // Shared flag checked by forwarding lambdas to avoid accessing `this`
478 // after the forwarder has been destroyed (set to false in the destructor).
479 std::shared_ptr<std::atomic<bool>> alive_;
480};
481
482template <typename Derived, typename InnerTransporter>
484{
485 protected:
486 template <typename Data, int scheme>
487 void _publish(const Data& d, const goby::middleware::Group& group,
488 const middleware::Publisher<Data>& /*publisher*/)
489 {
492 _publish_serialized(type_name, scheme, bytes, group);
493 }
494
495 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
496 std::function<void(const std::vector<unsigned char>&, int scheme, const std::string& type,
498 f,
499 const std::set<int>& schemes, const std::string& type_regex, const std::string& group_regex)
500 {
501 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
502 f, schemes, type_regex, group_regex);
503 static_cast<Derived*>(this)->_subscribe_regex_serialized(new_sub);
504 return new_sub;
505 }
506
507 template <typename Data, int scheme>
508 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f,
510 const middleware::Subscriber<Data>& /*subscriber*/)
511 {
512 std::string identifier = this->template _make_identifier<Data, scheme>(
514
515 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
516 f, group,
518 [=](const Data& /*d*/) { return group; }));
519
520 if (forwarder_subscriptions_.count(identifier) == 0 &&
521 portal_subscriptions_.count(identifier) == 0)
522 static_cast<Derived*>(this)->_do_portal_subscribe(identifier);
523
524 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
525 }
526
527 template <typename Data, int scheme>
531 {
532 std::string identifier = this->template _make_identifier<Data, scheme>(
534
535 portal_subscriptions_.erase(identifier);
536
537 // If no forwarded subscriptions, do the actual unsubscribe
538 if (forwarder_subscriptions_.count(identifier) == 0)
539 static_cast<Derived*>(this)->_do_portal_unsubscribe(identifier);
540 }
541 void _handle_received_data(std::unique_ptr<std::unique_lock<std::mutex>>& lock,
542 const std::string& data)
543 {
544 if (lock)
545 lock.reset();
546
547 std::string group, type;
548 int scheme, process;
549 std::size_t thread;
550 std::tie(group, scheme, type, process, thread) = this->parse_identifier(data);
551 std::string identifier = this->_make_identifier(
553
554 // build a set so if any of the handlers unsubscribes, we still have a pointer to the middleware::SerializationHandlerBase<>
555 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>> subs_to_post;
556 auto portal_range = portal_subscriptions_.equal_range(identifier);
557 for (auto it = portal_range.first; it != portal_range.second; ++it)
558 subs_to_post.push_back(it->second);
559 auto forwarder_it = forwarder_subscriptions_.find(identifier);
560 if (forwarder_it != forwarder_subscriptions_.end())
561 subs_to_post.push_back(forwarder_it->second);
562
563 // actually post the data
564 {
565 auto null_delim_it = std::find(std::begin(data), std::end(data),
567 for (auto& sub : subs_to_post)
568 {
569 if (auto sub_sp = sub.lock())
570 sub_sp->post(null_delim_it + 1, data.end());
571 }
572 }
573
574 if (!regex_subscriptions_.empty())
575 {
576 auto null_delim_it = std::find(std::begin(data), std::end(data),
578
579 bool forwarder_subscription_posted = false;
580 for (auto& sub : regex_subscriptions_)
581 {
582 // only post at most once for forwarders as the threads will filter
583 bool is_forwarded_sub =
584 sub.first != middleware::identifier_part_to_string(std::this_thread::get_id());
585 if (is_forwarded_sub && forwarder_subscription_posted)
586 continue;
587
588 if (sub.second->post(null_delim_it + 1, data.end(), scheme, type, group) &&
589 is_forwarded_sub)
590 forwarder_subscription_posted = true;
591 }
592 }
593 }
594
595 void _unsubscribe_all(const std::string& subscriber_id =
596 middleware::identifier_part_to_string(std::this_thread::get_id()))
597 {
598 // portal unsubscribe
599 if (subscriber_id == middleware::identifier_part_to_string(std::this_thread::get_id()))
600 {
601 for (const auto& p : portal_subscriptions_)
602 {
603 const auto& identifier = p.first;
604 if (forwarder_subscriptions_.count(identifier) == 0)
605 static_cast<Derived*>(this)->_do_portal_unsubscribe(identifier);
606 }
607 portal_subscriptions_.clear();
608 }
609 else // forwarder unsubscribe
610 {
611 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
613 subscriber_id,
614 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
615 }
616
617 // regex
618 if (regex_subscriptions_.size() > 0)
619 {
620 regex_subscriptions_.erase(subscriber_id);
621 if (regex_subscriptions_.empty())
622 static_cast<Derived*>(this)->_do_portal_wildcard_unsubscribe();
623 }
624 }
625
627 const std::shared_ptr<const middleware::SerializationHandlerBase<>>& subscription)
628 {
629 std::string identifier = this->_make_identifier(
630 subscription->type_name(), subscription->scheme(), subscription->subscribed_group(),
632
634 goby::glog << "Received subscription forwarded for identifier [" << identifier
635 << "] from subscriber id: " << subscription->subscriber_id() << std::endl;
636
637 switch (subscription->action())
638 {
640 {
641 // insert if this thread hasn't already subscribed
642 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
643 identifier) == 0)
644 {
645 // first to subscribe from a Forwarder
646 if (forwarder_subscriptions_.count(identifier) == 0)
647 {
648 // first to subscribe (locally or forwarded)
649 if (portal_subscriptions_.count(identifier) == 0)
650 static_cast<Derived*>(this)->_do_portal_subscribe(identifier);
651
652 // create Forwarder subscription
653 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
654 }
655 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
656 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
657 }
658 }
659 break;
660
662 {
663 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
664 }
665 break;
666
667 default: break;
668 }
669 }
670
671 void _forwarder_unsubscribe(const std::string& subscriber_id, const std::string& identifier)
672 {
673 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
674 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
675 {
676 bool no_forwarder_subscribers = true;
677 for (const auto& p : forwarder_subscription_identifiers_)
678 {
679 if (p.second.count(identifier) != 0)
680 {
681 no_forwarder_subscribers = false;
682 break;
683 }
684 }
685
686 // if no Forwarder subscriptions left
687 if (no_forwarder_subscribers)
688 {
689 // erase the Forwarder subscription
690 forwarder_subscriptions_.erase(it->second);
691
692 // do the actual unsubscribe if we aren't subscribe locally as well
693 if (portal_subscriptions_.count(identifier) == 0)
694 static_cast<Derived*>(this)->_do_portal_unsubscribe(identifier);
695 }
696
697 forwarder_subscription_identifiers_[subscriber_id].erase(it);
698 }
699 }
700
702 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
703 {
704 if (regex_subscriptions_.empty())
705 static_cast<Derived*>(this)->_do_portal_wildcard_subscribe();
706
707 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
708 }
709
710 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
712 {
713 std::string identifier =
716 static_cast<Derived*>(this)->_do_publish(identifier, bytes);
717 }
718
719 private:
720 // portal_subscriptions_ and forwarder_subscriptions_: maps identifier to subscription
721 std::unordered_multimap<std::string,
722 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
723 portal_subscriptions_;
724 // only one subscription for each forwarded identifier
725 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
726 forwarder_subscriptions_;
727
728 // maps subscriber_id [thread id as string] to map of identifier to forwarder subscription
729 std::unordered_map<
730 std::string, std::unordered_map<
731 std::string, typename decltype(forwarder_subscriptions_)::const_iterator>>
732 forwarder_subscription_identifiers_;
733
734 // subscriber id to subscription
735 std::unordered_multimap<std::string,
736 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
737 regex_subscriptions_;
738};
739
740template <typename Derived, typename InnerTransporter, typename ImplementationTag>
742 : public InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>,
743 public InterProcessPortalCommon<Derived, InnerTransporter>
744{
745 public:
748
749 InterProcessPortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
751
753
754 friend Base;
755
756 private:
757 void _init()
758 {
760 this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
761 [this](std::shared_ptr<const SerializerTransporterMessage> d)
762 {
763 std::vector<char> data(d->data().begin(), d->data().end());
764 static_cast<Derived*>(this)->_publish_serialized(
765 d->key().type(), d->key().marshalling_scheme(), data,
766 goby::middleware::DynamicGroup(d->key().group()));
767 });
768
769 this->inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
770 [this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s)
771 { static_cast<Derived*>(this)->_receive_subscription_forwarded(s); });
772
773 this->inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
774 [this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s)
775 { static_cast<Derived*>(this)->_subscribe_regex_serialized(s); });
776
777 this->inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
778 [this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s)
779 { static_cast<Derived*>(this)->_unsubscribe_all(s->subscriber_id()); });
780 }
781};
782
783} // namespace middleware
784} // namespace goby
785
787
788namespace goby
789{
790namespace middleware
791{
792
798template <typename InnerTransporter>
799class InterProcessForwarder<InnerTransporter, void>
800 : public InterProcessForwarder<InnerTransporter, zeromq::detail::InterProcessTag>
801{
802 public:
803 using Base = InterProcessForwarder<InnerTransporter, zeromq::detail::InterProcessTag>;
804
805 [[deprecated("Use zeromq::InterProcessForwarder<> or udpm::InterProcessForwarder<> instead of "
806 "middleware::InterProcessForwarder<>")]]
807 explicit InterProcessForwarder(InnerTransporter& inner)
808 : Base(inner)
809 {
810 }
811};
812
813} // namespace middleware
814} // namespace goby
815
816#endif
simple exception class for goby applications
Definition exception.h:35
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition group.h:121
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
Implements the forwarder concept for the interprocess layer.
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
InterProcessTransporterBase< InterProcessForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
std::string _make_identifier(const goby::middleware::Group &group, IdentifierWildcard wildcard)
Definition identifier.h:88
static std::tuple< std::string, int, std::string, int, std::size_t > parse_identifier(const std::string &identifier)
InterProcessPortalBase(InnerTransporter &inner)
void _subscribe(std::function< void(std::shared_ptr< const Data > d)> f, const goby::middleware::Group &group, const middleware::Subscriber< Data > &)
void _handle_received_data(std::unique_ptr< std::unique_lock< std::mutex > > &lock, const std::string &data)
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _publish(const Data &d, const goby::middleware::Group &group, const middleware::Publisher< Data > &)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _forwarder_unsubscribe(const std::string &subscriber_id, const std::string &identifier)
void _unsubscribe(const goby::middleware::Group &group, const middleware::Subscriber< Data > &=middleware::Subscriber< Data >())
std::shared_ptr< middleware::SerializationSubscriptionRegex > _subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const goby::middleware::Group &group)> f, const std::set< int > &schemes, const std::string &type_regex, const std::string &group_regex)
void _subscribe_regex_serialized(const std::shared_ptr< const middleware::SerializationSubscriptionRegex > &new_sub)
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
void check_validity()
Check validity of the Group for interthread use (at compile time)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
ImplementationTag implementation_tag
The ImplementationTag for this transporter (allows InterVehiclePortal to match the driver thread's fo...
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
InterProcessTransporterBase(InnerTransporter &inner)
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:39
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:235
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:301
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
constexpr auto concat(const char(&a)[N1], const char(&b)[N2])
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
std::string identifier_part_to_string(int i)
Definition identifier.h:60
middleware::InterProcessForwarder< InnerTransporter, detail::InterProcessTag > InterProcessForwarder
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
type
Categories for the various JSON types used in JWTs.
Definition jwt.h:2311
STL namespace.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition interface.h:129