26#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
27#define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
55template <
typename Derived,
typename InnerTransporter,
typename ImplementationTag>
58 InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>,
60 public Poller<InterProcessTransporterBase<Derived, InnerTransporter, ImplementationTag>>
85 template <
typename Data>
static constexpr int scheme()
87 int scheme = goby::middleware::scheme<Data>();
101 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
106 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(data,
group, publisher);
107 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
117 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
124 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(*data,
group, publisher);
125 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
136 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
140 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
148 static_cast<Derived*
>(
this)->_publish_serialized(type_name,
scheme, bytes,
group);
158 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
163 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(
164 [=](std::shared_ptr<const Data> d) { f(*d); },
group, subscriber);
174 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
179 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(f,
group, subscriber);
187 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
192 static_cast<Derived*
>(
this)->
template _unsubscribe<Data, scheme>(
group, subscriber);
205 std::shared_ptr<SerializationSubscriptionRegex>
209 const std::set<int>& schemes,
const std::string& type_regex =
".*",
210 const std::string& group_regex =
".*")
212 return static_cast<Derived*
>(
this)->_subscribe_regex(f, schemes, type_regex, group_regex);
224 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
226 std::function<
void(std::shared_ptr<const Data>,
const std::string& type)> f,
227 const Group&
group,
const std::string& type_regex =
".*")
229 std::regex special_chars{R
"([-[\]{}()*+?.,\^$|#\s])"};
230 std::string sanitized_group =
231 std::regex_replace(std::string(group), special_chars, R"(\$&)");
233 auto regex_lambda = [f = std::move(f)](
const std::vector<unsigned char>& data,
int schm,
234 const std::string& type,
const Group& grp)
236 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
242 return static_cast<Derived*
>(
this)->_subscribe_regex(regex_lambda, {
scheme}, type_regex,
243 "^" + sanitized_group +
"$");
255 int scheme = InterProcessTransporterBase::scheme<Data>()>
257 std::function<
void(std::shared_ptr<const Data>,
const std::string& type)> f,
258 const std::string& type_regex =
".*")
268 static_assert((
group.c_str() !=
nullptr) && (
group.c_str()[0] !=
'\0'),
269 "goby::middleware::Group must have non-zero length string to publish on the "
270 "InterProcess layer");
276 if ((
group.c_str() ==
nullptr) || (
group.c_str()[0] ==
'\0'))
277 throw(
goby::Exception(
"Group must have a non-empty string for use on InterProcess"));
294 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
296 return static_cast<Derived*
>(
this)->_poll(lock);
305template <
typename InnerTransporter,
typename ImplementationTag =
void>
class InterProcessForwarder;
311template <
typename InnerTransporter,
typename ImplementationTag>
314 InnerTransporter, ImplementationTag>
319 InnerTransporter, ImplementationTag>;
331 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
332 msg) { _receive_regex_data_forwarded(
msg); });
351 template <
typename Data,
int scheme>
356 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
357 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
358 auto* key =
msg->mutable_key();
360 key->set_marshalling_scheme(
scheme);
362 key->set_group(std::string(
group));
363 msg->set_allocated_data(sbytes);
365 *key->mutable_cfg() = publisher.
cfg();
367 this->
inner().template publish<Base::to_portal_group_>(
msg);
370 template <
typename Data,
int scheme>
371 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
const Group&
group,
374 this->
inner().template subscribe_dynamic<Data, scheme>(f,
group);
380 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d)
383 this->
inner().template publish_dynamic<Data, scheme>(d,
group);
386 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
387 inner_publication_lambda,
group,
389 [=](
const Data& d) {
return group; }));
391 this->
inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
395 template <
typename Data,
int scheme>
396 void _unsubscribe(
const Group&
group,
const Subscriber<Data>& subscriber)
398 this->
inner().template unsubscribe_dynamic<Data, scheme>(
group, subscriber);
400 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
401 new SerializationUnSubscription<Data, scheme>(
group));
403 this->
inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
407 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
410 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
411 auto* key =
msg->mutable_key();
413 key->set_marshalling_scheme(
scheme);
414 key->set_type(type_name);
415 key->set_group(std::string(
group));
416 msg->set_data(std::string(bytes.begin(), bytes.end()));
418 this->
inner().template publish<Base::to_portal_group_>(
msg);
421 void _unsubscribe_all()
423 regex_subscriptions_.clear();
424 auto all = std::make_shared<SerializationUnSubscribeAll>();
425 this->
inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
428 std::shared_ptr<SerializationSubscriptionRegex>
429 _subscribe_regex(std::function<
void(
const std::vector<unsigned char>&,
int scheme,
430 const std::string& type,
const Group&
group)>
432 const std::set<int>& schemes,
const std::string& type_regex =
".*",
433 const std::string& group_regex =
".*")
436 auto inner_publication_lambda = [=](
const std::vector<unsigned char>& data,
int scheme,
437 const std::string&
type,
const Group&
group)
441 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
443 forwarded_data->mutable_key()->set_marshalling_scheme(
scheme);
444 forwarded_data->mutable_key()->set_type(type);
445 forwarded_data->mutable_key()->set_group(
group);
446 forwarded_data->set_data(std::string(data.begin(), data.end()));
447 this->
inner().template publish<Base::regex_group_>(forwarded_data);
450 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
451 inner_publication_lambda, schemes, type_regex, group_regex);
452 this->
inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
453 portal_subscription);
455 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
456 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
457 regex_subscriptions_.insert(local_subscription);
458 return local_subscription;
461 void _receive_regex_data_forwarded(
462 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
msg)
464 const auto& bytes =
msg->data();
465 for (
auto& sub : regex_subscriptions_)
466 sub->post(bytes.begin(), bytes.end(),
msg->key().marshalling_scheme(),
470 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
476 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
479 std::shared_ptr<std::atomic<bool>> alive_;
482template <
typename Derived,
typename InnerTransporter>
486 template <
typename Data,
int scheme>
496 std::function<
void(
const std::vector<unsigned char>&,
int scheme,
const std::string& type,
499 const std::set<int>& schemes,
const std::string& type_regex,
const std::string& group_regex)
501 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
502 f, schemes, type_regex, group_regex);
507 template <
typename Data,
int scheme>
508 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
512 std::string identifier = this->
template _make_identifier<Data, scheme>(
515 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
518 [=](
const Data& ) {
return group; }));
520 if (forwarder_subscriptions_.count(identifier) == 0 &&
521 portal_subscriptions_.count(identifier) == 0)
522 static_cast<Derived*
>(
this)->_do_portal_subscribe(identifier);
524 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
527 template <
typename Data,
int scheme>
532 std::string identifier = this->
template _make_identifier<Data, scheme>(
535 portal_subscriptions_.erase(identifier);
538 if (forwarder_subscriptions_.count(identifier) == 0)
539 static_cast<Derived*
>(
this)->_do_portal_unsubscribe(identifier);
542 const std::string& data)
547 std::string
group, type;
555 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>> subs_to_post;
556 auto portal_range = portal_subscriptions_.equal_range(identifier);
557 for (
auto it = portal_range.first; it != portal_range.second; ++it)
558 subs_to_post.push_back(it->second);
559 auto forwarder_it = forwarder_subscriptions_.find(identifier);
560 if (forwarder_it != forwarder_subscriptions_.end())
561 subs_to_post.push_back(forwarder_it->second);
565 auto null_delim_it = std::find(std::begin(data), std::end(data),
567 for (
auto& sub : subs_to_post)
569 if (
auto sub_sp = sub.lock())
570 sub_sp->post(null_delim_it + 1, data.end());
574 if (!regex_subscriptions_.empty())
576 auto null_delim_it = std::find(std::begin(data), std::end(data),
579 bool forwarder_subscription_posted =
false;
580 for (
auto& sub : regex_subscriptions_)
583 bool is_forwarded_sub =
585 if (is_forwarded_sub && forwarder_subscription_posted)
588 if (sub.second->post(null_delim_it + 1, data.end(),
scheme, type,
group) &&
590 forwarder_subscription_posted =
true;
601 for (
const auto& p : portal_subscriptions_)
603 const auto& identifier = p.first;
604 if (forwarder_subscriptions_.count(identifier) == 0)
605 static_cast<Derived*
>(
this)->_do_portal_unsubscribe(identifier);
607 portal_subscriptions_.clear();
611 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
614 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
618 if (regex_subscriptions_.size() > 0)
620 regex_subscriptions_.erase(subscriber_id);
621 if (regex_subscriptions_.empty())
622 static_cast<Derived*
>(
this)->_do_portal_wildcard_unsubscribe();
630 subscription->type_name(), subscription->scheme(), subscription->subscribed_group(),
634 goby::glog <<
"Received subscription forwarded for identifier [" << identifier
635 <<
"] from subscriber id: " << subscription->subscriber_id() << std::endl;
637 switch (subscription->action())
642 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
646 if (forwarder_subscriptions_.count(identifier) == 0)
649 if (portal_subscriptions_.count(identifier) == 0)
650 static_cast<Derived*
>(
this)->_do_portal_subscribe(identifier);
653 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
655 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
656 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
673 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
674 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
676 bool no_forwarder_subscribers =
true;
677 for (
const auto& p : forwarder_subscription_identifiers_)
679 if (p.second.count(identifier) != 0)
681 no_forwarder_subscribers =
false;
687 if (no_forwarder_subscribers)
690 forwarder_subscriptions_.erase(it->second);
693 if (portal_subscriptions_.count(identifier) == 0)
694 static_cast<Derived*
>(
this)->_do_portal_unsubscribe(identifier);
697 forwarder_subscription_identifiers_[subscriber_id].erase(it);
702 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
704 if (regex_subscriptions_.empty())
705 static_cast<Derived*
>(
this)->_do_portal_wildcard_subscribe();
707 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
713 std::string identifier =
716 static_cast<Derived*
>(
this)->_do_publish(identifier, bytes);
721 std::unordered_multimap<std::string,
722 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
723 portal_subscriptions_;
725 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
726 forwarder_subscriptions_;
730 std::string, std::unordered_map<
731 std::string,
typename decltype(forwarder_subscriptions_)::const_iterator>>
732 forwarder_subscription_identifiers_;
735 std::unordered_multimap<std::string,
736 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
737 regex_subscriptions_;
740template <
typename Derived,
typename InnerTransporter,
typename ImplementationTag>
760 this->
inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
761 [
this](std::shared_ptr<const SerializerTransporterMessage> d)
763 std::vector<char> data(d->data().begin(), d->data().end());
765 d->key().type(), d->key().marshalling_scheme(), data,
769 this->
inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
770 [
this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s)
773 this->
inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
774 [
this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s)
777 this->
inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
778 [
this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s)
798template <
typename InnerTransporter>
803 using Base = InterProcessForwarder<InnerTransporter, zeromq::detail::InterProcessTag>;
805 [[deprecated(
"Use zeromq::InterProcessForwarder<> or udpm::InterProcessForwarder<> instead of "
806 "middleware::InterProcessForwarder<>")]]
simple exception class for goby applications
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporter & inner()
Implements the forwarder concept for the interprocess layer.
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
InterProcessTransporterBase< InterProcessForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
virtual ~InterProcessForwarder()
std::string _make_identifier(const goby::middleware::Group &group, IdentifierWildcard wildcard)
static std::tuple< std::string, int, std::string, int, std::size_t > parse_identifier(const std::string &identifier)
static const char end_delimiter
InterProcessPortalBase(InnerTransporter &inner)
virtual ~InterProcessPortalBase()
void _subscribe(std::function< void(std::shared_ptr< const Data > d)> f, const goby::middleware::Group &group, const middleware::Subscriber< Data > &)
void _handle_received_data(std::unique_ptr< std::unique_lock< std::mutex > > &lock, const std::string &data)
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _publish(const Data &d, const goby::middleware::Group &group, const middleware::Publisher< Data > &)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _forwarder_unsubscribe(const std::string &subscriber_id, const std::string &identifier)
void _unsubscribe(const goby::middleware::Group &group, const middleware::Subscriber< Data > &=middleware::Subscriber< Data >())
std::shared_ptr< middleware::SerializationSubscriptionRegex > _subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const goby::middleware::Group &group)> f, const std::set< int > &schemes, const std::string &type_regex, const std::string &group_regex)
void _subscribe_regex_serialized(const std::shared_ptr< const middleware::SerializationSubscriptionRegex > &new_sub)
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
virtual ~InterProcessTransporterBase()
void check_validity()
Check validity of the Group for interthread use (at compile time)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
static constexpr auto from_portal_group_name_
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
static constexpr Group to_portal_group_
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
ImplementationTag implementation_tag
The ImplementationTag for this transporter (allows InterVehiclePortal to match the driver thread's fo...
static constexpr Group regex_group_
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
static constexpr Group from_portal_group_
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
InterProcessTransporterBase(InnerTransporter &inner)
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
static constexpr auto to_portal_group_name_
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
InterProcessTransporterBase()
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void unsubscribe_all()
Unsubscribe from all current subscriptions.
static constexpr auto regex_group_name_
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
goby::util::logger::GroupSetter group(std::string n)
constexpr auto concat(const char(&a)[N1], const char(&b)[N2])
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
@ PROCESS_THREAD_WILDCARD
std::string identifier_part_to_string(int i)
middleware::InterProcessForwarder< InnerTransporter, detail::InterProcessTag > InterProcessForwarder
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
type
Categories for the various JSON types used in JWTs.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.