25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
54template <
typename Derived,
typename InnerTransporter,
typename ImplementationTag>
57 InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>,
59 public Poller<InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>>
84 template <
typename Data>
static constexpr int scheme()
86 int scheme = goby::middleware::scheme<Data>();
100 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
105 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(data,
group, publisher);
106 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
116 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
123 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(*data,
group, publisher);
124 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
135 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
139 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
147 static_cast<Derived*
>(
this)->_publish_serialized(type_name,
scheme, bytes,
group);
157 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
162 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(
163 [=](std::shared_ptr<const Data> d) { f(*d); },
group, subscriber);
173 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
178 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(f,
group, subscriber);
186 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
191 static_cast<Derived*
>(
this)->
template _unsubscribe<Data, scheme>(
group, subscriber);
204 std::shared_ptr<SerializationSubscriptionRegex>
208 const std::set<int>& schemes,
const std::string& type_regex =
".*",
209 const std::string& group_regex =
".*")
211 return static_cast<Derived*
>(
this)->_subscribe_regex(f, schemes, type_regex, group_regex);
223 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
225 std::function<
void(std::shared_ptr<const Data>,
const std::string& type)> f,
226 const Group&
group,
const std::string& type_regex =
".*")
228 std::regex special_chars{R
"([-[\]{}()*+?.,\^$|#\s])"};
229 std::string sanitized_group =
230 std::regex_replace(std::string(group), special_chars, R"(\$&)");
232 auto regex_lambda = [=](
const std::vector<unsigned char>& data,
int schm,
233 const std::string& type,
const Group& grp)
235 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
241 return static_cast<Derived*
>(
this)->_subscribe_regex(regex_lambda, {
scheme}, type_regex,
242 "^" + sanitized_group +
"$");
254 int scheme = InterProcessTransporterBase::scheme<Data>()>
256 std::function<
void(std::shared_ptr<const Data>,
const std::string& type)> f,
257 const std::string& type_regex =
".*")
267 static_assert((
group.c_str() !=
nullptr) && (
group.c_str()[0] !=
'\0'),
268 "goby::middleware::Group must have non-zero length string to publish on the "
269 "InterProcess layer");
275 if ((
group.c_str() ==
nullptr) || (
group.c_str()[0] ==
'\0'))
276 throw(
goby::Exception(
"Group must have a non-empty string for use on InterProcess"));
293 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
295 return static_cast<Derived*
>(
this)->_poll(lock);
304template <
typename InnerTransporter,
typename ImplementationTag =
void>
class InterProcessForwarder;
310template <
typename InnerTransporter,
typename ImplementationTag>
313 InnerTransporter, ImplementationTag>
318 InnerTransporter, ImplementationTag>;
330 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
331 msg) { _receive_regex_data_forwarded(
msg); });
350 template <
typename Data,
int scheme>
355 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
356 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
357 auto* key =
msg->mutable_key();
359 key->set_marshalling_scheme(
scheme);
361 key->set_group(std::string(
group));
362 msg->set_allocated_data(sbytes);
364 *key->mutable_cfg() = publisher.
cfg();
366 this->
inner().template publish<Base::to_portal_group_>(
msg);
369 template <
typename Data,
int scheme>
370 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
const Group&
group,
373 this->
inner().template subscribe_dynamic<Data, scheme>(f,
group);
379 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d)
382 this->
inner().template publish_dynamic<Data, scheme>(d,
group);
385 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
386 inner_publication_lambda,
group,
388 [=](
const Data& d) {
return group; }));
390 this->
inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
394 template <
typename Data,
int scheme>
395 void _unsubscribe(
const Group&
group,
const Subscriber<Data>& subscriber)
397 this->
inner().template unsubscribe_dynamic<Data, scheme>(
group, subscriber);
399 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
400 new SerializationUnSubscription<Data, scheme>(
group));
402 this->
inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
406 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
409 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
410 auto* key =
msg->mutable_key();
412 key->set_marshalling_scheme(
scheme);
413 key->set_type(type_name);
414 key->set_group(std::string(
group));
415 msg->set_data(std::string(bytes.begin(), bytes.end()));
417 this->
inner().template publish<Base::to_portal_group_>(
msg);
420 void _unsubscribe_all()
422 regex_subscriptions_.clear();
423 auto all = std::make_shared<SerializationUnSubscribeAll>();
424 this->
inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
427 std::shared_ptr<SerializationSubscriptionRegex>
428 _subscribe_regex(std::function<
void(
const std::vector<unsigned char>&,
int scheme,
429 const std::string& type,
const Group&
group)>
431 const std::set<int>& schemes,
const std::string& type_regex =
".*",
432 const std::string& group_regex =
".*")
435 auto inner_publication_lambda = [=](
const std::vector<unsigned char>& data,
int scheme,
436 const std::string&
type,
const Group&
group)
440 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
442 forwarded_data->mutable_key()->set_marshalling_scheme(
scheme);
443 forwarded_data->mutable_key()->set_type(type);
444 forwarded_data->mutable_key()->set_group(
group);
445 forwarded_data->set_data(std::string(data.begin(), data.end()));
446 this->
inner().template publish<Base::regex_group_>(forwarded_data);
449 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
450 inner_publication_lambda, schemes, type_regex, group_regex);
451 this->
inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
452 portal_subscription);
454 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
455 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
456 regex_subscriptions_.insert(local_subscription);
457 return local_subscription;
460 void _receive_regex_data_forwarded(
461 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
msg)
463 const auto& bytes =
msg->data();
464 for (
auto& sub : regex_subscriptions_)
465 sub->post(bytes.begin(), bytes.end(),
msg->key().marshalling_scheme(),
469 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
475 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
478 std::shared_ptr<std::atomic<bool>> alive_;
481template <
typename Derived,
typename InnerTransporter>
485 template <
typename Data,
int scheme>
495 std::function<
void(
const std::vector<unsigned char>&,
int scheme,
const std::string& type,
498 const std::set<int>& schemes,
const std::string& type_regex,
const std::string& group_regex)
500 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
501 f, schemes, type_regex, group_regex);
506 template <
typename Data,
int scheme>
507 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
511 std::string identifier = this->
template _make_identifier<Data, scheme>(
514 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
517 [=](
const Data& ) {
return group; }));
519 if (forwarder_subscriptions_.count(identifier) == 0 &&
520 portal_subscriptions_.count(identifier) == 0)
521 static_cast<Derived*
>(
this)->_do_portal_subscribe(identifier);
523 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
526 template <
typename Data,
int scheme>
531 std::string identifier = this->
template _make_identifier<Data, scheme>(
534 portal_subscriptions_.erase(identifier);
537 if (forwarder_subscriptions_.count(identifier) == 0)
538 static_cast<Derived*
>(
this)->_do_portal_unsubscribe(identifier);
541 const std::string& data)
546 std::string
group, type;
554 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>> subs_to_post;
555 auto portal_range = portal_subscriptions_.equal_range(identifier);
556 for (
auto it = portal_range.first; it != portal_range.second; ++it)
557 subs_to_post.push_back(it->second);
558 auto forwarder_it = forwarder_subscriptions_.find(identifier);
559 if (forwarder_it != forwarder_subscriptions_.end())
560 subs_to_post.push_back(forwarder_it->second);
564 auto null_delim_it = std::find(std::begin(data), std::end(data),
566 for (
auto& sub : subs_to_post)
568 if (
auto sub_sp = sub.lock())
569 sub_sp->post(null_delim_it + 1, data.end());
573 if (!regex_subscriptions_.empty())
575 auto null_delim_it = std::find(std::begin(data), std::end(data),
578 bool forwarder_subscription_posted =
false;
579 for (
auto& sub : regex_subscriptions_)
582 bool is_forwarded_sub =
584 if (is_forwarded_sub && forwarder_subscription_posted)
587 if (sub.second->post(null_delim_it + 1, data.end(),
scheme, type,
group) &&
589 forwarder_subscription_posted =
true;
600 for (
const auto& p : portal_subscriptions_)
602 const auto& identifier = p.first;
603 if (forwarder_subscriptions_.count(identifier) == 0)
604 static_cast<Derived*
>(
this)->_do_portal_unsubscribe(identifier);
606 portal_subscriptions_.clear();
610 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
613 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
617 if (regex_subscriptions_.size() > 0)
619 regex_subscriptions_.erase(subscriber_id);
620 if (regex_subscriptions_.empty())
621 static_cast<Derived*
>(
this)->_do_portal_wildcard_unsubscribe();
629 subscription->type_name(), subscription->scheme(), subscription->subscribed_group(),
633 goby::glog <<
"Received subscription forwarded for identifier [" << identifier
634 <<
"] from subscriber id: " << subscription->subscriber_id() << std::endl;
636 switch (subscription->action())
641 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
645 if (forwarder_subscriptions_.count(identifier) == 0)
648 if (portal_subscriptions_.count(identifier) == 0)
649 static_cast<Derived*
>(
this)->_do_portal_subscribe(identifier);
652 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
654 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
655 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
672 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
673 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
675 bool no_forwarder_subscribers =
true;
676 for (
const auto& p : forwarder_subscription_identifiers_)
678 if (p.second.count(identifier) != 0)
680 no_forwarder_subscribers =
false;
686 if (no_forwarder_subscribers)
689 forwarder_subscriptions_.erase(it->second);
692 if (portal_subscriptions_.count(identifier) == 0)
693 static_cast<Derived*
>(
this)->_do_portal_unsubscribe(identifier);
696 forwarder_subscription_identifiers_[subscriber_id].erase(it);
701 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
703 if (regex_subscriptions_.empty())
704 static_cast<Derived*
>(
this)->_do_portal_wildcard_subscribe();
706 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
712 std::string identifier =
715 static_cast<Derived*
>(
this)->_do_publish(identifier, bytes);
720 std::unordered_multimap<std::string,
721 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
722 portal_subscriptions_;
724 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
725 forwarder_subscriptions_;
729 std::string, std::unordered_map<
730 std::string,
typename decltype(forwarder_subscriptions_)::const_iterator>>
731 forwarder_subscription_identifiers_;
734 std::unordered_multimap<std::string,
735 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
736 regex_subscriptions_;
739template <
typename Derived,
typename InnerTransporter,
typename ImplementationTag>
759 this->
inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
760 [
this](std::shared_ptr<const SerializerTransporterMessage> d)
762 std::vector<char> data(d->data().begin(), d->data().end());
764 d->key().type(), d->key().marshalling_scheme(), data,
768 this->
inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
769 [
this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s)
772 this->
inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
773 [
this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s)
776 this->
inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
777 [
this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s)
797template <
typename InnerTransporter>
802 using Base = InterProcessForwarder<InnerTransporter, zeromq::detail::InterProcessTag>;
804 [[deprecated(
"Use zeromq::InterProcessForwarder<> or udpm::InterProcessForwarder<> instead of "
805 "middleware::InterProcessForwarder<>")]]
simple exception class for goby applications
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporter & inner()
Implements the forwarder concept for the interprocess layer.
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
InterProcessTransporterBase< InterProcessForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
virtual ~InterProcessForwarder()
std::string _make_identifier(const goby::middleware::Group &group, IdentifierWildcard wildcard)
static std::tuple< std::string, int, std::string, int, std::size_t > parse_identifier(const std::string &identifier)
static const char end_delimiter
InterProcessPortalBase(InnerTransporter &inner)
virtual ~InterProcessPortalBase()
void _subscribe(std::function< void(std::shared_ptr< const Data > d)> f, const goby::middleware::Group &group, const middleware::Subscriber< Data > &)
void _handle_received_data(std::unique_ptr< std::unique_lock< std::mutex > > &lock, const std::string &data)
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _publish(const Data &d, const goby::middleware::Group &group, const middleware::Publisher< Data > &)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _forwarder_unsubscribe(const std::string &subscriber_id, const std::string &identifier)
void _unsubscribe(const goby::middleware::Group &group, const middleware::Subscriber< Data > &=middleware::Subscriber< Data >())
std::shared_ptr< middleware::SerializationSubscriptionRegex > _subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const goby::middleware::Group &group)> f, const std::set< int > &schemes, const std::string &type_regex, const std::string &group_regex)
void _subscribe_regex_serialized(const std::shared_ptr< const middleware::SerializationSubscriptionRegex > &new_sub)
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
virtual ~InterProcessTransporterBase()
void check_validity()
Check validity of the Group for interthread use (at compile time)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
static constexpr auto from_portal_group_name_
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
static constexpr Group to_portal_group_
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
ImplementationTag implementation_tag
The ImplementationTag for this transporter (allows InterVehiclePortal to match the driver thread's fo...
static constexpr Group regex_group_
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
static constexpr Group from_portal_group_
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
InterProcessTransporterBase(InnerTransporter &inner)
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
static constexpr auto to_portal_group_name_
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
InterProcessTransporterBase()
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void unsubscribe_all()
Unsubscribe from all current subscriptions.
static constexpr auto regex_group_name_
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
goby::util::logger::GroupSetter group(std::string n)
constexpr auto concat(const char(&a)[N1], const char(&b)[N2])
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
@ PROCESS_THREAD_WILDCARD
std::string identifier_part_to_string(int i)
middleware::InterProcessForwarder< InnerTransporter, detail::InterProcessTag > InterProcessForwarder
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
type
Generic JSON types used in JWTs.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.