Goby3 3.3.0
2025.07.10
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2025:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
25#define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
26
28
29#include <atomic> // for atomic
30#include <chrono> // for mill...
31#include <condition_variable> // for cond...
32#include <deque> // for deque
33#include <functional> // for func...
34#include <iosfwd> // for size_t
35#include <memory> // for shar...
36#include <mutex> // for time...
37#include <set> // for set
38#include <string> // for string
39#include <thread> // for get_id
40#include <tuple> // for make...
41#include <unistd.h> // for getpid
42#include <unordered_map> // for unor...
43#include <utility> // for make...
44#include <vector> // for vector
45
46#include <zmq.h> // for ZMQ_...
47#include <zmq.hpp> // for sock...
48
49#include "goby/middleware/common.h" // for thre...
50#include "goby/middleware/group.h" // for Group
51#include "goby/middleware/marshalling/interface.h" // for Seri...
54#include "goby/middleware/transport/interface.h" // for Poll...
55#include "goby/middleware/transport/interprocess.h" // for Inte...
56#include "goby/middleware/transport/null.h" // for Null...
58#include "goby/middleware/transport/subscriber.h" // for Subs...
59#include "goby/time/system_clock.h" // for Syst...
60#include "goby/util/debug_logger/flex_ostream.h" // for Flex...
64
65#if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
66#define USE_OLD_ZMQ_CPP_API
67#endif
68
69#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
70#define USE_OLD_CPPZMQ_SETSOCKOPT
71#endif
72
73#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
74#define USE_OLD_CPPZMQ_POLL
75#endif
76
77namespace goby
78{
79namespace middleware
80{
81template <typename Data> class Publisher;
82} // namespace middleware
83
84namespace zeromq
85{
86namespace groups
87{
88constexpr goby::middleware::Group manager_request{"goby::zeromq::_internal_manager_request"};
89constexpr goby::middleware::Group manager_response{"goby::zeromq::_internal_manager_response"};
90} // namespace groups
91
92constexpr char delimiter{'/'};
93constexpr const char* delimiter_str{"/"};
94// use old ASCII substitute char for '/' in group or type
95constexpr char delimiter_substitute{0x1a};
96
97void setup_socket(zmq::socket_t& socket, const protobuf::Socket& cfg);
98
105
106// scheme
107inline std::string identifier_part_to_string(int i)
108{
110}
111inline std::string identifier_part_to_string(std::thread::id i)
112{
114}
115
117template <typename Key>
118const std::string& id_component(const Key& k, std::unordered_map<Key, std::string>& map)
119{
120 auto it = map.find(k);
121 if (it != map.end())
122 return it->second;
123
124 std::string v = identifier_part_to_string(k) + delimiter_str;
125 auto it_pair = map.insert(std::make_pair(k, v));
126 return it_pair.first->second;
127}
128
129inline std::string
130make_identifier(const std::string& type_name, int scheme, const std::string& group,
131 IdentifierWildcard wildcard, const std::string& process,
132 std::unordered_map<int, std::string>* schemes_buffer = nullptr,
133 std::unordered_map<std::thread::id, std::string>* threads_buffer = nullptr)
134{
135 // swap out delimiter with substitute
136 std::string sanitized_type_name = type_name;
137 std::replace(sanitized_type_name.begin(), sanitized_type_name.end(), delimiter,
139 std::string sanitized_group_name = group;
140 std::replace(sanitized_group_name.begin(), sanitized_group_name.end(), delimiter,
142 switch (wildcard)
143 {
144 default:
146 {
147 auto thread = std::this_thread::get_id();
148 return (
149 delimiter_str + sanitized_group_name + delimiter_str +
150 (schemes_buffer ? id_component(scheme, *schemes_buffer)
151 : std::string(identifier_part_to_string(scheme) + delimiter_str)) +
152 sanitized_type_name + delimiter_str + process + delimiter_str +
153 (threads_buffer ? id_component(thread, *threads_buffer)
154 : std::string(identifier_part_to_string(thread) + delimiter_str)));
155 }
157 {
158 return (delimiter_str + sanitized_group_name + delimiter_str +
159 (schemes_buffer
160 ? id_component(scheme, *schemes_buffer)
161 : std::string(identifier_part_to_string(scheme) + delimiter_str)) +
162 sanitized_type_name + delimiter_str + process + delimiter_str);
163 }
165 {
166 return (delimiter_str + sanitized_group_name + delimiter_str +
167 (schemes_buffer
168 ? id_component(scheme, *schemes_buffer)
169 : std::string(identifier_part_to_string(scheme) + delimiter_str)) +
170 sanitized_type_name + delimiter_str);
171 }
172 }
173}
174
175#ifdef USE_OLD_ZMQ_CPP_API
178#else
179using zmq_recv_flags_type = zmq::recv_flags;
180using zmq_send_flags_type = zmq::send_flags;
181#endif
182
183// run in the same thread as InterProcessPortal
185{
186 public:
187 InterProcessPortalMainThread(zmq::context_t& context);
189 {
190#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
191 control_socket_.setsockopt(ZMQ_LINGER, 0);
192 publish_socket_.setsockopt(ZMQ_LINGER, 0);
193#else
194 control_socket_.set(zmq::sockopt::linger, 0);
195 publish_socket_.set(zmq::sockopt::linger, 0);
196#endif
197 }
198
199 bool publish_ready() { return !hold_; }
200 bool subscribe_ready() { return have_pubsub_sockets_; }
201
202 bool recv(protobuf::InprocControl* control_msg,
205
206 void set_hold_state(bool hold);
207 bool hold_state() { return hold_; }
208
209 void publish(const std::string& identifier, const char* bytes, int size,
210 bool ignore_buffer = false);
211 void subscribe(const std::string& identifier);
212 void unsubscribe(const std::string& identifier);
214
215 std::deque<protobuf::InprocControl>& control_buffer() { return control_buffer_; }
217
218 private:
219 private:
220 zmq::socket_t control_socket_;
221 zmq::socket_t publish_socket_;
222 bool hold_{true};
223 bool have_pubsub_sockets_{false};
224
225 std::deque<std::pair<std::string, std::vector<char>>>
226 publish_queue_; //used before hold == false
227
228 // buffer messages while waiting for (un)subscribe ack
229 std::deque<protobuf::InprocControl> control_buffer_;
230};
231
232// run in a separate thread to allow zmq_.poll() to block without interrupting the main thread
234{
235 public:
237 zmq::context_t& context, std::atomic<bool>& alive,
238 std::shared_ptr<std::condition_variable_any> poller_cv);
239 void run();
241 {
242#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
243 control_socket_.setsockopt(ZMQ_LINGER, 0);
244 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
245 manager_socket_.setsockopt(ZMQ_LINGER, 0);
246#else
247 control_socket_.set(zmq::sockopt::linger, 0);
248 subscribe_socket_.set(zmq::sockopt::linger, 0);
249 manager_socket_.set(zmq::sockopt::linger, 0);
250#endif
251 }
252
253 private:
254 void poll(long timeout_ms = -1);
255 void control_data(const zmq::message_t& zmq_msg);
256 void subscribe_data(const zmq::message_t& zmq_msg);
257 void manager_data(const zmq::message_t& zmq_msg);
258 void send_control_msg(const protobuf::InprocControl& control);
259 void send_manager_request(const protobuf::ManagerRequest& req);
260
261 private:
263 zmq::socket_t control_socket_;
264 zmq::socket_t subscribe_socket_;
265 zmq::socket_t manager_socket_;
266 std::atomic<bool>& alive_;
267 std::shared_ptr<std::condition_variable_any> poller_cv_;
268 std::vector<zmq::pollitem_t> poll_items_;
269 enum
270 {
271 SOCKET_CONTROL = 0,
272 SOCKET_MANAGER = 1,
273 SOCKET_SUBSCRIBE = 2
274 };
275 enum
276 {
277 NUMBER_SOCKETS = 3
278 };
279 bool have_pubsub_sockets_{false};
280 bool hold_{true};
281 bool manager_waiting_for_reply_{false};
282
283 goby::time::SystemClock::time_point next_hold_state_request_time_{
285 const goby::time::SystemClock::duration hold_state_request_period_{
286 std::chrono::milliseconds(100)};
287};
288
289template <typename InnerTransporter,
290 template <typename Derived, typename InnerTransporterType> class PortalBase>
292 : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase>,
293 InnerTransporter>
294{
295 public:
297 InnerTransporter>;
298
300 : cfg_(cfg),
301 zmq_context_(cfg.zeromq_number_io_threads()),
302 zmq_main_(zmq_context_),
303 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
304 {
305 _init();
306 }
307
308 InterProcessPortalImplementation(InnerTransporter& inner,
310 : Base(inner),
311 cfg_(cfg),
312 zmq_context_(cfg.zeromq_number_io_threads()),
313 zmq_main_(zmq_context_),
314 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
315 {
316 _init();
317 }
318
320 {
321 if (zmq_thread_)
322 {
323 zmq_main_.reader_shutdown();
324 zmq_thread_->join();
325 }
326 }
327
329 void ready() { ready_ = true; }
330
332 bool hold_state() { return zmq_main_.hold_state(); }
333
334 friend Base;
335 friend typename Base::Base;
336
337 private:
338 void _init()
339 {
341
342 // start zmq read thread
343 zmq_thread_ = std::make_unique<std::thread>([this]() { zmq_read_thread_.run(); });
344
345 while (!zmq_main_.subscribe_ready())
346 {
347 protobuf::InprocControl control_msg;
348 if (zmq_main_.recv(&control_msg))
349 {
350 switch (control_msg.type())
351 {
353 zmq_main_.set_publish_cfg(control_msg.publish_socket());
354 break;
355 default: break;
356 }
357 }
358 }
359
360 //
361 // Handle hold state request/response using pub sub so that we ensure
362 // publishing and subscribe is completely functional before releasing the hold
363 //
364 _subscribe<protobuf::ManagerResponse, middleware::MarshallingScheme::PROTOBUF>(
365 [this](std::shared_ptr<const protobuf::ManagerResponse> response)
366 {
367 goby::glog.is_debug3() && goby::glog << "Received ManagerResponse: "
368 << response->ShortDebugString() << std::endl;
369 if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
370 response->client_pid() == getpid() &&
371 response->client_name() == cfg_.client_name())
372 {
373 zmq_main_.set_hold_state(response->hold());
374 }
375
376 // we're good to go now, so let's unsubscribe to this group
377 if (zmq_main_.publish_ready())
378 {
379 _unsubscribe<protobuf::ManagerResponse,
383 }
384 },
386 }
387
388 template <typename Data, int scheme>
389 void _publish(const Data& d, const goby::middleware::Group& group,
390 const middleware::Publisher<Data>& /*publisher*/, bool ignore_buffer = false)
391 {
394 _publish_serialized(type_name, scheme, bytes, group, ignore_buffer);
395 }
396
397 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
398 const goby::middleware::Group& group, bool ignore_buffer = false)
399 {
400 std::string identifier = _make_fully_qualified_identifier(type_name, scheme, group) + '\0';
401 zmq_main_.publish(identifier, &bytes[0], bytes.size(), ignore_buffer);
402 }
403
404 template <typename Data, int scheme>
405 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f,
407 const middleware::Subscriber<Data>& /*subscriber*/)
408 {
409 std::string identifier =
410 _make_identifier<Data, scheme>(group, IdentifierWildcard::PROCESS_THREAD_WILDCARD);
411
412 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
413 f, group,
414 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
415 [=](const Data& /*d*/) { return group; }));
416
417 if (forwarder_subscriptions_.count(identifier) == 0 &&
418 portal_subscriptions_.count(identifier) == 0)
419 zmq_main_.subscribe(identifier);
420 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
421 }
422
423 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
424 std::function<void(const std::vector<unsigned char>&, int scheme, const std::string& type,
426 f,
427 const std::set<int>& schemes, const std::string& type_regex, const std::string& group_regex)
428 {
429 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
430 f, schemes, type_regex, group_regex);
431 _subscribe_regex(new_sub);
432 return new_sub;
433 }
434
435 template <typename Data, int scheme>
436 void _unsubscribe(
438 const middleware::Subscriber<Data>& /*subscriber*/ = middleware::Subscriber<Data>())
439 {
440 std::string identifier =
441 _make_identifier<Data, scheme>(group, IdentifierWildcard::PROCESS_THREAD_WILDCARD);
442
443 portal_subscriptions_.erase(identifier);
444
445 // If no forwarded subscriptions, do the actual unsubscribe
446 if (forwarder_subscriptions_.count(identifier) == 0)
447 zmq_main_.unsubscribe(identifier);
448 }
449
450 void _unsubscribe_all(
451 const std::string& subscriber_id = identifier_part_to_string(std::this_thread::get_id()))
452 {
453 // portal unsubscribe
454 if (subscriber_id == identifier_part_to_string(std::this_thread::get_id()))
455 {
456 for (const auto& p : portal_subscriptions_)
457 {
458 const auto& identifier = p.first;
459 if (forwarder_subscriptions_.count(identifier) == 0)
460 zmq_main_.unsubscribe(identifier);
461 }
462 portal_subscriptions_.clear();
463 }
464 else // forwarder unsubscribe
465 {
466 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
467 _forwarder_unsubscribe(
468 subscriber_id,
469 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
470 }
471
472 // regex
473 if (regex_subscriptions_.size() > 0)
474 {
475 regex_subscriptions_.erase(subscriber_id);
476 if (regex_subscriptions_.empty())
477 zmq_main_.unsubscribe(delimiter_str);
478 }
479 }
480
481 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
482 {
483 int items = 0;
484 protobuf::InprocControl new_control_msg;
485
486#ifdef USE_OLD_ZMQ_CPP_API
487 int flags = ZMQ_NOBLOCK;
488#else
489 auto flags = zmq::recv_flags::dontwait;
490#endif
491
492 while (zmq_main_.recv(&new_control_msg, flags))
493 zmq_main_.control_buffer().push_back(new_control_msg);
494
495 while (!zmq_main_.control_buffer().empty())
496 {
497 const auto& control_msg = zmq_main_.control_buffer().front();
498 switch (control_msg.type())
499 {
501 {
502 ++items;
503 if (lock)
504 lock.reset();
505
506 const auto& data = control_msg.received_data();
507
508 std::string group, type, thread;
509 int scheme, process;
510 std::tie(group, scheme, type, process, thread) = parse_identifier(data);
511 std::string identifier = _make_identifier(
513
514 // build a set so if any of the handlers unsubscribes, we still have a pointer to the middleware::SerializationHandlerBase<>
515 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>>
516 subs_to_post;
517 auto portal_range = portal_subscriptions_.equal_range(identifier);
518 for (auto it = portal_range.first; it != portal_range.second; ++it)
519 subs_to_post.push_back(it->second);
520 auto forwarder_it = forwarder_subscriptions_.find(identifier);
521 if (forwarder_it != forwarder_subscriptions_.end())
522 subs_to_post.push_back(forwarder_it->second);
523
524 // actually post the data
525 {
526 const auto& data = control_msg.received_data();
527 auto null_delim_it = std::find(std::begin(data), std::end(data), '\0');
528 for (auto& sub : subs_to_post)
529 {
530 if (auto sub_sp = sub.lock())
531 sub_sp->post(null_delim_it + 1, data.end());
532 }
533 }
534
535 if (!regex_subscriptions_.empty())
536 {
537 auto null_delim_it = std::find(std::begin(data), std::end(data), '\0');
538
539 bool forwarder_subscription_posted = false;
540 for (auto& sub : regex_subscriptions_)
541 {
542 // only post at most once for forwarders as the threads will filter
543 bool is_forwarded_sub =
544 sub.first != identifier_part_to_string(std::this_thread::get_id());
545 if (is_forwarded_sub && forwarder_subscription_posted)
546 continue;
547
548 if (sub.second->post(null_delim_it + 1, data.end(), scheme, type,
549 group) &&
550 is_forwarded_sub)
551 forwarder_subscription_posted = true;
552 }
553 }
554 }
555 break;
556
558 {
559 protobuf::ManagerRequest req;
560
561 req.set_ready(ready_);
562 req.set_request(protobuf::PROVIDE_HOLD_STATE);
563 req.set_client_name(cfg_.client_name());
564 req.set_client_pid(getpid());
565
566 goby::glog.is_debug3() && goby::glog << "Published ManagerRequest: "
567 << req.ShortDebugString() << std::endl;
568
569 _publish<protobuf::ManagerRequest, middleware::MarshallingScheme::PROTOBUF>(
571 middleware::Publisher<protobuf::ManagerRequest>(), true);
572 }
573 break;
574
575 default: break;
576 }
577 zmq_main_.control_buffer().pop_front();
578 }
579 return items;
580 }
581
582 void _receive_publication_forwarded(
584 {
585 std::string identifier =
586 _make_identifier(msg.key().type(), msg.key().marshalling_scheme(), msg.key().group(),
588 '\0';
589 auto& bytes = msg.data();
590 zmq_main_.publish(identifier, &bytes[0], bytes.size());
591 }
592
593 void _receive_subscription_forwarded(
594 const std::shared_ptr<const middleware::SerializationHandlerBase<>>& subscription)
595 {
596 std::string identifier = _make_identifier(subscription->type_name(), subscription->scheme(),
597 subscription->subscribed_group(),
599
601 goby::glog << "Received subscription forwarded for identifier [" << identifier
602 << "] from subscriber id: " << subscription->subscriber_id() << std::endl;
603
604 switch (subscription->action())
605 {
607 {
608 // insert if this thread hasn't already subscribed
609 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
610 identifier) == 0)
611 {
612 // first to subscribe from a Forwarder
613 if (forwarder_subscriptions_.count(identifier) == 0)
614 {
615 // first to subscribe (locally or forwarded)
616 if (portal_subscriptions_.count(identifier) == 0)
617 zmq_main_.subscribe(identifier);
618
619 // create Forwarder subscription
620 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
621 }
622 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
623 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
624 }
625 }
626 break;
627
629 {
630 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
631 }
632 break;
633
634 default: break;
635 }
636 }
637
638 void _forwarder_unsubscribe(const std::string& subscriber_id, const std::string& identifier)
639 {
640 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
641 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
642 {
643 bool no_forwarder_subscribers = true;
644 for (const auto& p : forwarder_subscription_identifiers_)
645 {
646 if (p.second.count(identifier) != 0)
647 {
648 no_forwarder_subscribers = false;
649 break;
650 }
651 }
652
653 // if no Forwarder subscriptions left
654 if (no_forwarder_subscribers)
655 {
656 // erase the Forwarder subscription
657 forwarder_subscriptions_.erase(it->second);
658
659 // do the actual unsubscribe if we aren't subscribe locally as well
660 if (portal_subscriptions_.count(identifier) == 0)
661 zmq_main_.unsubscribe(identifier);
662 }
663
664 forwarder_subscription_identifiers_[subscriber_id].erase(it);
665 }
666 }
667
668 void _receive_regex_subscription_forwarded(
669 std::shared_ptr<const middleware::SerializationSubscriptionRegex> subscription)
670 {
671 _subscribe_regex(subscription);
672 }
673
674 void _subscribe_regex(
675 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
676 {
677 if (regex_subscriptions_.empty())
678 zmq_main_.subscribe(delimiter_str);
679
680 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
681 }
682
683 template <typename Data, int scheme>
684 std::string _make_identifier(const goby::middleware::Group& group, IdentifierWildcard wildcard)
685 {
687 scheme, group, wildcard);
688 }
689
690 std::string _make_fully_qualified_identifier(const std::string& type_name, int scheme,
691 const std::string& group)
692 {
693 return _make_identifier(type_name, scheme, group, IdentifierWildcard::THREAD_WILDCARD) +
694 id_component(std::this_thread::get_id(), threads_);
695 }
696
697 template <typename Data, int scheme>
698 std::string _make_identifier(const Data& d, const goby::middleware::Group& group,
699 IdentifierWildcard wildcard)
700 {
702 scheme, group, wildcard);
703 }
704
705 std::string _make_identifier(const std::string& type_name, int scheme, const std::string& group,
706 IdentifierWildcard wildcard)
707 {
708 return make_identifier(type_name, scheme, group, wildcard, process_, &schemes_, &threads_);
709 }
710
711 // group, scheme, type, process, thread
712 std::tuple<std::string, int, std::string, int, std::size_t>
713 parse_identifier(const std::string& identifier)
714 {
715 enum
716 {
717 POS_GROUP = 0,
718 POS_SCHEME = 1,
719 POS_TYPE = 2,
720 POS_PROCESS = 3,
721 POS_THREAD = 4,
722 POS_MAX = POS_THREAD
723 };
724
725 const int number_elements = POS_MAX + 1;
726 std::string::size_type previous_delimiter = 0;
727 std::vector<std::string> elem;
728 for (auto i = 0; i < number_elements; ++i)
729 {
730 auto delimiter_pos = identifier.find(delimiter, previous_delimiter + 1);
731 elem.push_back(identifier.substr(previous_delimiter + 1,
732 delimiter_pos - (previous_delimiter + 1)));
733 previous_delimiter = delimiter_pos;
734 }
735
736 auto& group = elem[POS_GROUP];
737 auto& type = elem[POS_TYPE];
738 std::replace(type.begin(), type.end(), delimiter_substitute, delimiter);
739 std::replace(group.begin(), group.end(), delimiter_substitute, delimiter);
740 return std::make_tuple(elem[POS_GROUP],
742 elem[POS_TYPE], std::stoi(elem[POS_PROCESS]),
743 std::stoull(elem[POS_THREAD], nullptr, 16));
744 }
745
746 private:
747 const protobuf::InterProcessPortalConfig cfg_;
748
749 std::unique_ptr<std::thread> zmq_thread_;
750 std::atomic<bool> zmq_alive_{true};
751 zmq::context_t zmq_context_;
752 InterProcessPortalMainThread zmq_main_;
753 InterProcessPortalReadThread zmq_read_thread_;
754
755 // maps identifier to subscription
756 std::unordered_multimap<std::string,
757 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
758 portal_subscriptions_;
759 // only one subscription for each forwarded identifier
760 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
761 forwarder_subscriptions_;
762 std::unordered_map<
763 std::string, std::unordered_map<
764 std::string, typename decltype(forwarder_subscriptions_)::const_iterator>>
765 forwarder_subscription_identifiers_;
766
767 std::unordered_multimap<std::string,
768 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
769 regex_subscriptions_;
770 std::string process_{std::to_string(getpid())};
771 std::unordered_map<int, std::string> schemes_;
772 std::unordered_map<std::thread::id, std::string> threads_;
773
774 bool ready_{false};
775};
776
778{
779 public:
780 Router(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg)
781 : context_(context), cfg_(cfg)
782 {
783 }
784
785 void run();
786 unsigned last_port(zmq::socket_t& socket);
787
788 Router(Router&) = delete;
790
791 public:
792 std::atomic<unsigned> pub_port{0};
793 std::atomic<unsigned> sub_port{0};
794
795 private:
796 zmq::context_t& context_;
798};
799
801{
802 public:
803 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
804 const Router& router);
805
806 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
807 const Router& router, const protobuf::InterProcessManagerHold& hold)
808 : Manager(context, cfg, router)
809 {
810 for (const auto& req_c : hold.required_client()) required_clients_.insert(req_c);
811 }
812
813 void run();
814
818
820
821 private:
822 std::set<std::string> reported_clients_;
823 std::set<std::string> required_clients_;
824
825 zmq::context_t& context_;
827 const Router& router_;
828
829 std::vector<zmq::pollitem_t> poll_items_;
830 enum
831 {
832 SOCKET_MANAGER = 0,
833 SOCKET_SUBSCRIBE = 1,
834 };
835 enum
836 {
837 NUMBER_SOCKETS = 2
838 };
839
840 std::unique_ptr<zmq::socket_t> manager_socket_;
841 std::unique_ptr<zmq::socket_t> subscribe_socket_;
842 std::unique_ptr<zmq::socket_t> publish_socket_;
843
844 std::string zmq_filter_req_{make_identifier(
846 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
847 middleware::scheme<protobuf::ManagerRequest>(), groups::manager_request,
848 IdentifierWildcard::PROCESS_THREAD_WILDCARD, std::to_string(getpid()))};
849
850 std::string zmq_filter_rep_{
851 make_identifier(middleware::SerializerParserHelper<
852 protobuf::ManagerResponse,
853 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
854 middleware::scheme<protobuf::ManagerResponse>(), groups::manager_response,
855 IdentifierWildcard::NO_WILDCARDS, std::to_string(getpid())) +
856 std::string(1, '\0')};
857}; // namespace zeromq
858
859template <typename InnerTransporter = middleware::NullTransporter>
862
863} // namespace zeromq
864} // namespace goby
865
866#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
void set_lock_action(logger_lock::LockAction lock_action)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
bool hold_state()
When using hold functionality, returns whether the system is holding (true) and thus waiting for all ...
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
void ready()
When using hold functionality, call when the process is ready to receive publications (typically done...
void set_publish_cfg(const protobuf::Socket &cfg)
void unsubscribe(const std::string &identifier)
std::deque< protobuf::InprocControl > & control_buffer()
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())
InterProcessPortalReadThread(const protobuf::InterProcessPortalConfig &cfg, zmq::context_t &context, std::atomic< bool > &alive, std::shared_ptr< std::condition_variable_any > poller_cv)
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router, const protobuf::InterProcessManagerHold &hold)
protobuf::Socket subscribe_socket_cfg()
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router)
protobuf::Socket publish_socket_cfg()
protobuf::ManagerResponse handle_request(const protobuf::ManagerRequest &pb_request)
Router & operator=(Router &)=delete
Router(Router &)=delete
std::atomic< unsigned > sub_port
unsigned last_port(zmq::socket_t &socket)
Router(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg)
std::atomic< unsigned > pub_port
static constexpr InprocControlType REQUEST_HOLD_STATE
static constexpr InprocControlType RECEIVE
::goby::zeromq::protobuf::InprocControl_InprocControlType type() const
static constexpr InprocControlType PUB_CONFIGURATION
const ::goby::zeromq::protobuf::Socket & publish_socket() const
const std::string & required_client(int index) const
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:53
constexpr goby::middleware::Group manager_response
constexpr goby::middleware::Group manager_request
std::string identifier_part_to_string(int i)
constexpr const char * delimiter_str
constexpr char delimiter
int zmq_send_flags_type
int zmq_recv_flags_type
void setup_socket(zmq::socket_t &socket, const protobuf::Socket &cfg)
constexpr char delimiter_substitute
std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
const std::string & id_component(const Key &k, std::unordered_map< Key, std::string > &map)
Given key, find the string in the map, or create it (to_string) and store it, and return the string.
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
type
Generic JSON types used in JWTs.
Definition jwt.h:2072
static int from_string(const std::string &s)
Convert from a string to a marshalling scheme id.
Definition interface.h:77
static std::string to_string(int e)
Convert a known marshalling scheme to a human-readable string or an unknown scheme to the string repr...
Definition interface.h:67
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
std::chrono::time_point< SystemClock > time_point
static time_point now() noexcept
Returns the current system time unless SimulatorSettings::using_sim_time is set to true,...
std::chrono::microseconds duration
Duration type.