Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Ryan Govostes <rgovostes+git@gmail.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
27
28#include <atomic>
29#include <functional>
30#include <sys/types.h>
31#include <thread>
32#include <tuple>
33#include <unistd.h>
34
36
43
44namespace goby
45{
46namespace middleware
47{
48
54template <typename Derived, typename InnerTransporter, typename ImplementationTag>
57 InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>,
58 InnerTransporter>,
59 public Poller<InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>>
60{
63 InnerTransporter>;
64
65 using PollerType =
67
68 public:
70 using implementation_tag = ImplementationTag;
71
74 {
75 }
77
79
84 template <typename Data> static constexpr int scheme()
85 {
86 int scheme = goby::middleware::scheme<Data>();
87 // if default returns DCCL, use PROTOBUF instead
90 return scheme;
91 }
92
100 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
101 void publish_dynamic(const Data& data, const Group& group,
102 const Publisher<Data>& publisher = Publisher<Data>())
103 {
105 static_cast<Derived*>(this)->template _publish<Data, scheme>(data, group, publisher);
106 this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
107 }
108
116 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
117 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
118 const Publisher<Data>& publisher = Publisher<Data>())
119 {
120 if (data)
121 {
123 static_cast<Derived*>(this)->template _publish<Data, scheme>(*data, group, publisher);
124 this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
125 }
126 }
127
135 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
136 void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
137 const Publisher<Data>& publisher = Publisher<Data>())
138 {
139 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
140 }
141
143 void publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
145 {
147 static_cast<Derived*>(this)->_publish_serialized(type_name, scheme, bytes, group);
148 }
149
157 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
158 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
159 const Subscriber<Data>& subscriber = Subscriber<Data>())
160 {
162 static_cast<Derived*>(this)->template _subscribe<Data, scheme>(
163 [=](std::shared_ptr<const Data> d) { f(*d); }, group, subscriber);
164 }
165
173 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
174 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
175 const Subscriber<Data>& subscriber = Subscriber<Data>())
176 {
178 static_cast<Derived*>(this)->template _subscribe<Data, scheme>(f, group, subscriber);
179 }
180
186 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
188 const Subscriber<Data>& subscriber = Subscriber<Data>())
189 {
191 static_cast<Derived*>(this)->template _unsubscribe<Data, scheme>(group, subscriber);
192 }
193
195 void unsubscribe_all() { static_cast<Derived*>(this)->_unsubscribe_all(); }
196
204 std::shared_ptr<SerializationSubscriptionRegex>
205 subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
206 const std::string& type, const Group& group)>
207 f,
208 const std::set<int>& schemes, const std::string& type_regex = ".*",
209 const std::string& group_regex = ".*")
210 {
211 return static_cast<Derived*>(this)->_subscribe_regex(f, schemes, type_regex, group_regex);
212 }
213
223 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
224 std::shared_ptr<SerializationSubscriptionRegex> subscribe_type_regex(
225 std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
226 const Group& group, const std::string& type_regex = ".*")
227 {
228 std::regex special_chars{R"([-[\]{}()*+?.,\^$|#\s])"};
229 std::string sanitized_group =
230 std::regex_replace(std::string(group), special_chars, R"(\$&)");
231
232 auto regex_lambda = [=](const std::vector<unsigned char>& data, int schm,
233 const std::string& type, const Group& grp)
234 {
235 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
236 auto msg =
237 SerializerParserHelper<Data, scheme>::parse(data_begin, data_end, actual_end, type);
238 f(msg, type);
239 };
240
241 return static_cast<Derived*>(this)->_subscribe_regex(regex_lambda, {scheme}, type_regex,
242 "^" + sanitized_group + "$");
243 }
244
253 template <const Group& group, typename Data,
254 int scheme = InterProcessTransporterBase::scheme<Data>()>
256 std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
257 const std::string& type_regex = ".*")
258 {
259 subscribe_type_regex(f, group, type_regex);
260 }
261
265 template <const Group& group> void check_validity()
266 {
267 static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
268 "goby::middleware::Group must have non-zero length string to publish on the "
269 "InterProcess layer");
270 }
271
274 {
275 if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
276 throw(goby::Exception("Group must have a non-empty string for use on InterProcess"));
277 }
278
279 protected:
280 inline static constexpr auto to_portal_group_name_ =
281 detail::concat(ImplementationTag::prefix, "::to_portal");
282 inline static constexpr auto regex_group_name_ =
283 detail::concat(ImplementationTag::prefix, "::regex");
284 inline static constexpr auto from_portal_group_name_ =
285 detail::concat(ImplementationTag::prefix, "::from_portal");
286
287 inline static constexpr Group to_portal_group_{to_portal_group_name_.data()};
288 inline static constexpr Group regex_group_{regex_group_name_.data()};
289 inline static constexpr Group from_portal_group_{from_portal_group_name_.data()};
290
291 private:
292 friend PollerType;
293 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
294 {
295 return static_cast<Derived*>(this)->_poll(lock);
296 }
297};
298
304template <typename InnerTransporter, typename ImplementationTag = void> class InterProcessForwarder;
305
310template <typename InnerTransporter, typename ImplementationTag>
312 : public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter, ImplementationTag>,
313 InnerTransporter, ImplementationTag>
314{
315 public:
316 using Base =
318 InnerTransporter, ImplementationTag>;
319
323 InterProcessForwarder(InnerTransporter& inner)
324 : Base(inner), alive_(std::make_shared<std::atomic<bool>>(true))
325 {
326 this->inner()
329 [this](
330 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
331 msg) { _receive_regex_data_forwarded(msg); });
332 }
334 {
335 // Mark as no longer alive so that forwarding lambdas (captured by the
336 // portal's subscription handlers) become no-ops. This avoids
337 // use-after-free when the portal invokes a stale callback after the
338 // forwarder has been destroyed.
339 *alive_ = false;
340
341 this->unsubscribe_all();
342
343 // TODO - remove by adding in an explicit handshake with the unsubscribe_all publication so that we don't delete ourself (and thus our inner()) before the Portal has deleted all the subscriptions
344 usleep(1e5);
345 }
346
347 friend Base;
348
349 private:
350 template <typename Data, int scheme>
351 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
352 {
353 // create and forward publication to edge
354 std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
355 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
356 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
357 auto* key = msg->mutable_key();
358
359 key->set_marshalling_scheme(scheme);
361 key->set_group(std::string(group));
362 msg->set_allocated_data(sbytes);
363
364 *key->mutable_cfg() = publisher.cfg();
365
366 this->inner().template publish<Base::to_portal_group_>(msg);
367 }
368
369 template <typename Data, int scheme>
370 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
371 const Subscriber<Data>& subscriber)
372 {
373 this->inner().template subscribe_dynamic<Data, scheme>(f, group);
374
375 // forward subscription to edge
376 // Capture alive_ by value (shared_ptr copy) so the lambda can detect
377 // when the forwarder has been destroyed and avoid use-after-free.
378 auto alive = alive_;
379 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d)
380 {
381 if (*alive)
382 this->inner().template publish_dynamic<Data, scheme>(d, group);
383 };
384
385 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
386 inner_publication_lambda, group,
388 [=](const Data& d) { return group; }));
389
390 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
391 subscription);
392 }
393
394 template <typename Data, int scheme>
395 void _unsubscribe(const Group& group, const Subscriber<Data>& subscriber)
396 {
397 this->inner().template unsubscribe_dynamic<Data, scheme>(group, subscriber);
398
399 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
400 new SerializationUnSubscription<Data, scheme>(group));
401
402 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
403 unsubscription);
404 }
405
406 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
408 {
409 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
410 auto* key = msg->mutable_key();
411
412 key->set_marshalling_scheme(scheme);
413 key->set_type(type_name);
414 key->set_group(std::string(group));
415 msg->set_data(std::string(bytes.begin(), bytes.end()));
416
417 this->inner().template publish<Base::to_portal_group_>(msg);
418 }
419
420 void _unsubscribe_all()
421 {
422 regex_subscriptions_.clear();
423 auto all = std::make_shared<SerializationUnSubscribeAll>();
424 this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
425 }
426
427 std::shared_ptr<SerializationSubscriptionRegex>
428 _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
429 const std::string& type, const Group& group)>
430 f,
431 const std::set<int>& schemes, const std::string& type_regex = ".*",
432 const std::string& group_regex = ".*")
433 {
434 auto alive = alive_;
435 auto inner_publication_lambda = [=](const std::vector<unsigned char>& data, int scheme,
436 const std::string& type, const Group& group)
437 {
438 if (!*alive)
439 return;
440 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
442 forwarded_data->mutable_key()->set_marshalling_scheme(scheme);
443 forwarded_data->mutable_key()->set_type(type);
444 forwarded_data->mutable_key()->set_group(group);
445 forwarded_data->set_data(std::string(data.begin(), data.end()));
446 this->inner().template publish<Base::regex_group_>(forwarded_data);
447 };
448
449 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
450 inner_publication_lambda, schemes, type_regex, group_regex);
451 this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
452 portal_subscription);
453
454 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
455 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
456 regex_subscriptions_.insert(local_subscription);
457 return local_subscription;
458 }
459
460 void _receive_regex_data_forwarded(
461 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage> msg)
462 {
463 const auto& bytes = msg->data();
464 for (auto& sub : regex_subscriptions_)
465 sub->post(bytes.begin(), bytes.end(), msg->key().marshalling_scheme(),
466 msg->key().type(), msg->key().group());
467 }
468
469 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
470 {
471 return 0;
472 } // A forwarder is a shell, only the inner Transporter has data
473
474 private:
475 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
476 // Shared flag checked by forwarding lambdas to avoid accessing `this`
477 // after the forwarder has been destroyed (set to false in the destructor).
478 std::shared_ptr<std::atomic<bool>> alive_;
479};
480
481template <typename Derived, typename InnerTransporter>
483{
484 protected:
485 template <typename Data, int scheme>
486 void _publish(const Data& d, const goby::middleware::Group& group,
487 const middleware::Publisher<Data>& /*publisher*/)
488 {
491 _publish_serialized(type_name, scheme, bytes, group);
492 }
493
494 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
495 std::function<void(const std::vector<unsigned char>&, int scheme, const std::string& type,
497 f,
498 const std::set<int>& schemes, const std::string& type_regex, const std::string& group_regex)
499 {
500 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
501 f, schemes, type_regex, group_regex);
502 static_cast<Derived*>(this)->_subscribe_regex_serialized(new_sub);
503 return new_sub;
504 }
505
506 template <typename Data, int scheme>
507 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f,
509 const middleware::Subscriber<Data>& /*subscriber*/)
510 {
511 std::string identifier = this->template _make_identifier<Data, scheme>(
513
514 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
515 f, group,
517 [=](const Data& /*d*/) { return group; }));
518
519 if (forwarder_subscriptions_.count(identifier) == 0 &&
520 portal_subscriptions_.count(identifier) == 0)
521 static_cast<Derived*>(this)->_do_portal_subscribe(identifier);
522
523 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
524 }
525
526 template <typename Data, int scheme>
530 {
531 std::string identifier = this->template _make_identifier<Data, scheme>(
533
534 portal_subscriptions_.erase(identifier);
535
536 // If no forwarded subscriptions, do the actual unsubscribe
537 if (forwarder_subscriptions_.count(identifier) == 0)
538 static_cast<Derived*>(this)->_do_portal_unsubscribe(identifier);
539 }
540 void _handle_received_data(std::unique_ptr<std::unique_lock<std::mutex>>& lock,
541 const std::string& data)
542 {
543 if (lock)
544 lock.reset();
545
546 std::string group, type;
547 int scheme, process;
548 std::size_t thread;
549 std::tie(group, scheme, type, process, thread) = this->parse_identifier(data);
550 std::string identifier = this->_make_identifier(
552
553 // build a set so if any of the handlers unsubscribes, we still have a pointer to the middleware::SerializationHandlerBase<>
554 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>> subs_to_post;
555 auto portal_range = portal_subscriptions_.equal_range(identifier);
556 for (auto it = portal_range.first; it != portal_range.second; ++it)
557 subs_to_post.push_back(it->second);
558 auto forwarder_it = forwarder_subscriptions_.find(identifier);
559 if (forwarder_it != forwarder_subscriptions_.end())
560 subs_to_post.push_back(forwarder_it->second);
561
562 // actually post the data
563 {
564 auto null_delim_it = std::find(std::begin(data), std::end(data),
566 for (auto& sub : subs_to_post)
567 {
568 if (auto sub_sp = sub.lock())
569 sub_sp->post(null_delim_it + 1, data.end());
570 }
571 }
572
573 if (!regex_subscriptions_.empty())
574 {
575 auto null_delim_it = std::find(std::begin(data), std::end(data),
577
578 bool forwarder_subscription_posted = false;
579 for (auto& sub : regex_subscriptions_)
580 {
581 // only post at most once for forwarders as the threads will filter
582 bool is_forwarded_sub =
583 sub.first != middleware::identifier_part_to_string(std::this_thread::get_id());
584 if (is_forwarded_sub && forwarder_subscription_posted)
585 continue;
586
587 if (sub.second->post(null_delim_it + 1, data.end(), scheme, type, group) &&
588 is_forwarded_sub)
589 forwarder_subscription_posted = true;
590 }
591 }
592 }
593
594 void _unsubscribe_all(const std::string& subscriber_id =
595 middleware::identifier_part_to_string(std::this_thread::get_id()))
596 {
597 // portal unsubscribe
598 if (subscriber_id == middleware::identifier_part_to_string(std::this_thread::get_id()))
599 {
600 for (const auto& p : portal_subscriptions_)
601 {
602 const auto& identifier = p.first;
603 if (forwarder_subscriptions_.count(identifier) == 0)
604 static_cast<Derived*>(this)->_do_portal_unsubscribe(identifier);
605 }
606 portal_subscriptions_.clear();
607 }
608 else // forwarder unsubscribe
609 {
610 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
612 subscriber_id,
613 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
614 }
615
616 // regex
617 if (regex_subscriptions_.size() > 0)
618 {
619 regex_subscriptions_.erase(subscriber_id);
620 if (regex_subscriptions_.empty())
621 static_cast<Derived*>(this)->_do_portal_wildcard_unsubscribe();
622 }
623 }
624
626 const std::shared_ptr<const middleware::SerializationHandlerBase<>>& subscription)
627 {
628 std::string identifier = this->_make_identifier(
629 subscription->type_name(), subscription->scheme(), subscription->subscribed_group(),
631
633 goby::glog << "Received subscription forwarded for identifier [" << identifier
634 << "] from subscriber id: " << subscription->subscriber_id() << std::endl;
635
636 switch (subscription->action())
637 {
639 {
640 // insert if this thread hasn't already subscribed
641 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
642 identifier) == 0)
643 {
644 // first to subscribe from a Forwarder
645 if (forwarder_subscriptions_.count(identifier) == 0)
646 {
647 // first to subscribe (locally or forwarded)
648 if (portal_subscriptions_.count(identifier) == 0)
649 static_cast<Derived*>(this)->_do_portal_subscribe(identifier);
650
651 // create Forwarder subscription
652 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
653 }
654 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
655 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
656 }
657 }
658 break;
659
661 {
662 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
663 }
664 break;
665
666 default: break;
667 }
668 }
669
670 void _forwarder_unsubscribe(const std::string& subscriber_id, const std::string& identifier)
671 {
672 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
673 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
674 {
675 bool no_forwarder_subscribers = true;
676 for (const auto& p : forwarder_subscription_identifiers_)
677 {
678 if (p.second.count(identifier) != 0)
679 {
680 no_forwarder_subscribers = false;
681 break;
682 }
683 }
684
685 // if no Forwarder subscriptions left
686 if (no_forwarder_subscribers)
687 {
688 // erase the Forwarder subscription
689 forwarder_subscriptions_.erase(it->second);
690
691 // do the actual unsubscribe if we aren't subscribe locally as well
692 if (portal_subscriptions_.count(identifier) == 0)
693 static_cast<Derived*>(this)->_do_portal_unsubscribe(identifier);
694 }
695
696 forwarder_subscription_identifiers_[subscriber_id].erase(it);
697 }
698 }
699
701 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
702 {
703 if (regex_subscriptions_.empty())
704 static_cast<Derived*>(this)->_do_portal_wildcard_subscribe();
705
706 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
707 }
708
709 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
711 {
712 std::string identifier =
715 static_cast<Derived*>(this)->_do_publish(identifier, bytes);
716 }
717
718 private:
719 // portal_subscriptions_ and forwarder_subscriptions_: maps identifier to subscription
720 std::unordered_multimap<std::string,
721 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
722 portal_subscriptions_;
723 // only one subscription for each forwarded identifier
724 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
725 forwarder_subscriptions_;
726
727 // maps subscriber_id [thread id as string] to map of identifier to forwarder subscription
728 std::unordered_map<
729 std::string, std::unordered_map<
730 std::string, typename decltype(forwarder_subscriptions_)::const_iterator>>
731 forwarder_subscription_identifiers_;
732
733 // subscriber id to subscription
734 std::unordered_multimap<std::string,
735 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
736 regex_subscriptions_;
737};
738
739template <typename Derived, typename InnerTransporter, typename ImplementationTag>
741 : public InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>,
742 public InterProcessPortalCommon<Derived, InnerTransporter>
743{
744 public:
747
748 InterProcessPortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
750
752
753 friend Base;
754
755 private:
756 void _init()
757 {
759 this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
760 [this](std::shared_ptr<const SerializerTransporterMessage> d)
761 {
762 std::vector<char> data(d->data().begin(), d->data().end());
763 static_cast<Derived*>(this)->_publish_serialized(
764 d->key().type(), d->key().marshalling_scheme(), data,
765 goby::middleware::DynamicGroup(d->key().group()));
766 });
767
768 this->inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
769 [this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s)
770 { static_cast<Derived*>(this)->_receive_subscription_forwarded(s); });
771
772 this->inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
773 [this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s)
774 { static_cast<Derived*>(this)->_subscribe_regex_serialized(s); });
775
776 this->inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
777 [this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s)
778 { static_cast<Derived*>(this)->_unsubscribe_all(s->subscriber_id()); });
779 }
780};
781
782} // namespace middleware
783} // namespace goby
784
786
787namespace goby
788{
789namespace middleware
790{
791
797template <typename InnerTransporter>
798class InterProcessForwarder<InnerTransporter, void>
799 : public InterProcessForwarder<InnerTransporter, zeromq::detail::InterProcessTag>
800{
801 public:
802 using Base = InterProcessForwarder<InnerTransporter, zeromq::detail::InterProcessTag>;
803
804 [[deprecated("Use zeromq::InterProcessForwarder<> or udpm::InterProcessForwarder<> instead of "
805 "middleware::InterProcessForwarder<>")]]
806 explicit InterProcessForwarder(InnerTransporter& inner)
807 : Base(inner)
808 {
809 }
810};
811
812} // namespace middleware
813} // namespace goby
814
815#endif
simple exception class for goby applications
Definition exception.h:35
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition group.h:120
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Implements the forwarder concept for the interprocess layer.
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
InterProcessTransporterBase< InterProcessForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
std::string _make_identifier(const goby::middleware::Group &group, IdentifierWildcard wildcard)
Definition identifier.h:88
static std::tuple< std::string, int, std::string, int, std::size_t > parse_identifier(const std::string &identifier)
InterProcessPortalBase(InnerTransporter &inner)
void _subscribe(std::function< void(std::shared_ptr< const Data > d)> f, const goby::middleware::Group &group, const middleware::Subscriber< Data > &)
void _handle_received_data(std::unique_ptr< std::unique_lock< std::mutex > > &lock, const std::string &data)
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _publish(const Data &d, const goby::middleware::Group &group, const middleware::Publisher< Data > &)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _forwarder_unsubscribe(const std::string &subscriber_id, const std::string &identifier)
void _unsubscribe(const goby::middleware::Group &group, const middleware::Subscriber< Data > &=middleware::Subscriber< Data >())
std::shared_ptr< middleware::SerializationSubscriptionRegex > _subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const goby::middleware::Group &group)> f, const std::set< int > &schemes, const std::string &type_regex, const std::string &group_regex)
void _subscribe_regex_serialized(const std::shared_ptr< const middleware::SerializationSubscriptionRegex > &new_sub)
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
void check_validity()
Check validity of the Group for interthread use (at compile time)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
ImplementationTag implementation_tag
The ImplementationTag for this transporter (allows InterVehiclePortal to match the driver thread's fo...
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
InterProcessTransporterBase(InnerTransporter &inner)
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:234
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:300
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
constexpr auto concat(const char(&a)[N1], const char(&b)[N2])
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
std::string identifier_part_to_string(int i)
Definition identifier.h:60
middleware::InterProcessForwarder< InnerTransporter, detail::InterProcessTag > InterProcessForwarder
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
type
Generic JSON types used in JWTs.
Definition jwt.h:2072
STL namespace.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition interface.h:129