292 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase>,
301 zmq_context_(cfg.zeromq_number_io_threads()),
302 zmq_main_(zmq_context_),
303 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
312 zmq_context_(cfg.zeromq_number_io_threads()),
313 zmq_main_(zmq_context_),
314 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
335 friend typename Base::Base;
343 zmq_thread_ = std::make_unique<std::thread>([
this]() { zmq_read_thread_.
run(); });
348 if (zmq_main_.
recv(&control_msg))
350 switch (control_msg.
type())
364 _subscribe<protobuf::ManagerResponse, middleware::MarshallingScheme::PROTOBUF>(
365 [
this](std::shared_ptr<const protobuf::ManagerResponse> response)
368 << response->ShortDebugString() << std::endl;
370 response->client_pid() == getpid() &&
388 template <
typename Data,
int scheme>
394 _publish_serialized(type_name, scheme, bytes,
group, ignore_buffer);
397 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
400 std::string identifier = _make_fully_qualified_identifier(type_name, scheme,
group) +
'\0';
401 zmq_main_.
publish(identifier, &bytes[0], bytes.size(), ignore_buffer);
404 template <
typename Data,
int scheme>
405 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
407 const middleware::Subscriber<Data>& )
409 std::string identifier =
412 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
415 [=](
const Data& ) {
return group; }));
417 if (forwarder_subscriptions_.count(identifier) == 0 &&
418 portal_subscriptions_.count(identifier) == 0)
420 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
423 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
424 std::function<
void(
const std::vector<unsigned char>&,
int scheme,
const std::string& type,
427 const std::set<int>& schemes,
const std::string& type_regex,
const std::string& group_regex)
429 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
430 f, schemes, type_regex, group_regex);
431 _subscribe_regex(new_sub);
435 template <
typename Data,
int scheme>
438 const middleware::Subscriber<Data>& = middleware::Subscriber<Data>())
440 std::string identifier =
443 portal_subscriptions_.erase(identifier);
446 if (forwarder_subscriptions_.count(identifier) == 0)
450 void _unsubscribe_all(
456 for (
const auto& p : portal_subscriptions_)
458 const auto& identifier = p.first;
459 if (forwarder_subscriptions_.count(identifier) == 0)
462 portal_subscriptions_.clear();
466 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
467 _forwarder_unsubscribe(
469 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
473 if (regex_subscriptions_.size() > 0)
475 regex_subscriptions_.erase(subscriber_id);
476 if (regex_subscriptions_.empty())
481 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
484 protobuf::InprocControl new_control_msg;
486#ifdef USE_OLD_ZMQ_CPP_API
487 int flags = ZMQ_NOBLOCK;
489 auto flags = zmq::recv_flags::dontwait;
492 while (zmq_main_.
recv(&new_control_msg, flags))
498 switch (control_msg.type())
506 const auto& data = control_msg.received_data();
510 std::tie(
group, scheme, type, process, thread) = parse_identifier(data);
511 std::string identifier = _make_identifier(
515 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>>
517 auto portal_range = portal_subscriptions_.equal_range(identifier);
518 for (
auto it = portal_range.first; it != portal_range.second; ++it)
519 subs_to_post.push_back(it->second);
520 auto forwarder_it = forwarder_subscriptions_.find(identifier);
521 if (forwarder_it != forwarder_subscriptions_.end())
522 subs_to_post.push_back(forwarder_it->second);
526 const auto& data = control_msg.received_data();
527 auto null_delim_it = std::find(std::begin(data), std::end(data),
'\0');
528 for (
auto& sub : subs_to_post)
530 if (
auto sub_sp = sub.lock())
531 sub_sp->post(null_delim_it + 1, data.end());
535 if (!regex_subscriptions_.empty())
537 auto null_delim_it = std::find(std::begin(data), std::end(data),
'\0');
539 bool forwarder_subscription_posted =
false;
540 for (
auto& sub : regex_subscriptions_)
543 bool is_forwarded_sub =
545 if (is_forwarded_sub && forwarder_subscription_posted)
548 if (sub.second->post(null_delim_it + 1, data.end(), scheme, type,
551 forwarder_subscription_posted =
true;
559 protobuf::ManagerRequest req;
561 req.set_ready(ready_);
564 req.set_client_pid(getpid());
567 << req.ShortDebugString() << std::endl;
569 _publish<protobuf::ManagerRequest, middleware::MarshallingScheme::PROTOBUF>(
571 middleware::Publisher<protobuf::ManagerRequest>(),
true);
582 void _receive_publication_forwarded(
585 std::string identifier =
586 _make_identifier(
msg.key().type(),
msg.key().marshalling_scheme(),
msg.key().group(),
589 auto& bytes =
msg.data();
590 zmq_main_.
publish(identifier, &bytes[0], bytes.size());
593 void _receive_subscription_forwarded(
594 const std::shared_ptr<
const middleware::SerializationHandlerBase<>>& subscription)
596 std::string identifier = _make_identifier(subscription->type_name(), subscription->scheme(),
597 subscription->subscribed_group(),
601 goby::glog <<
"Received subscription forwarded for identifier [" << identifier
602 <<
"] from subscriber id: " << subscription->subscriber_id() << std::endl;
604 switch (subscription->action())
609 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
613 if (forwarder_subscriptions_.count(identifier) == 0)
616 if (portal_subscriptions_.count(identifier) == 0)
620 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
622 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
623 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
630 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
638 void _forwarder_unsubscribe(
const std::string& subscriber_id,
const std::string& identifier)
640 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
641 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
643 bool no_forwarder_subscribers =
true;
644 for (
const auto& p : forwarder_subscription_identifiers_)
646 if (p.second.count(identifier) != 0)
648 no_forwarder_subscribers =
false;
654 if (no_forwarder_subscribers)
657 forwarder_subscriptions_.erase(it->second);
660 if (portal_subscriptions_.count(identifier) == 0)
664 forwarder_subscription_identifiers_[subscriber_id].erase(it);
668 void _receive_regex_subscription_forwarded(
669 std::shared_ptr<const middleware::SerializationSubscriptionRegex> subscription)
671 _subscribe_regex(subscription);
674 void _subscribe_regex(
675 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
677 if (regex_subscriptions_.empty())
680 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
683 template <
typename Data,
int scheme>
687 scheme,
group, wildcard);
690 std::string _make_fully_qualified_identifier(
const std::string& type_name,
int scheme,
691 const std::string&
group)
697 template <
typename Data,
int scheme>
702 scheme,
group, wildcard);
705 std::string _make_identifier(
const std::string& type_name,
int scheme,
const std::string&
group,
712 std::tuple<std::string, int, std::string, int, std::size_t>
713 parse_identifier(
const std::string& identifier)
725 const int number_elements = POS_MAX + 1;
726 std::string::size_type previous_delimiter = 0;
727 std::vector<std::string> elem;
728 for (
auto i = 0; i < number_elements; ++i)
730 auto delimiter_pos = identifier.find(
delimiter, previous_delimiter + 1);
731 elem.push_back(identifier.substr(previous_delimiter + 1,
732 delimiter_pos - (previous_delimiter + 1)));
733 previous_delimiter = delimiter_pos;
736 auto&
group = elem[POS_GROUP];
737 auto&
type = elem[POS_TYPE];
740 return std::make_tuple(elem[POS_GROUP],
742 elem[POS_TYPE], std::stoi(elem[POS_PROCESS]),
743 std::stoull(elem[POS_THREAD],
nullptr, 16));
747 const protobuf::InterProcessPortalConfig cfg_;
749 std::unique_ptr<std::thread> zmq_thread_;
750 std::atomic<bool> zmq_alive_{
true};
751 zmq::context_t zmq_context_;
752 InterProcessPortalMainThread zmq_main_;
753 InterProcessPortalReadThread zmq_read_thread_;
756 std::unordered_multimap<std::string,
757 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
758 portal_subscriptions_;
760 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
761 forwarder_subscriptions_;
763 std::string, std::unordered_map<
764 std::string,
typename decltype(forwarder_subscriptions_)::const_iterator>>
765 forwarder_subscription_identifiers_;
767 std::unordered_multimap<std::string,
768 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
769 regex_subscriptions_;
770 std::string process_{std::to_string(getpid())};
771 std::unordered_map<int, std::string> schemes_;
772 std::unordered_map<std::thread::id, std::string> threads_;