Goby3  3.1.4
2024.02.22
interprocess.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
25 #define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
26 
28 
29 #include <atomic> // for atomic
30 #include <chrono> // for mill...
31 #include <condition_variable> // for cond...
32 #include <deque> // for deque
33 #include <functional> // for func...
34 #include <iosfwd> // for size_t
35 #include <memory> // for shar...
36 #include <mutex> // for time...
37 #include <set> // for set
38 #include <string> // for string
39 #include <thread> // for get_id
40 #include <tuple> // for make...
41 #include <unistd.h> // for getpid
42 #include <unordered_map> // for unor...
43 #include <utility> // for make...
44 #include <vector> // for vector
45 
46 #include <zmq.h> // for ZMQ_...
47 #include <zmq.hpp> // for sock...
48 
49 #include "goby/middleware/common.h" // for thre...
50 #include "goby/middleware/group.h" // for Group
51 #include "goby/middleware/marshalling/interface.h" // for Seri...
54 #include "goby/middleware/transport/interface.h" // for Poll...
55 #include "goby/middleware/transport/interprocess.h" // for Inte...
56 #include "goby/middleware/transport/null.h" // for Null...
58 #include "goby/middleware/transport/subscriber.h" // for Subs...
59 #include "goby/time/system_clock.h" // for Syst...
60 #include "goby/util/debug_logger/flex_ostream.h" // for Flex...
61 #include "goby/util/debug_logger/flex_ostreambuf.h" // for lock
62 #include "goby/zeromq/protobuf/interprocess_config.pb.h" // for Inte...
63 #include "goby/zeromq/protobuf/interprocess_zeromq.pb.h" // for Inpr...
64 
65 #if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
66 #define USE_OLD_ZMQ_CPP_API
67 #endif
68 
69 #if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
70 #define USE_OLD_CPPZMQ_SETSOCKOPT
71 #endif
72 
73 #if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
74 #define USE_OLD_CPPZMQ_POLL
75 #endif
76 
77 namespace goby
78 {
79 namespace middleware
80 {
81 template <typename Data> class Publisher;
82 } // namespace middleware
83 
84 namespace zeromq
85 {
86 namespace groups
87 {
88 constexpr goby::middleware::Group manager_request{"goby::zeromq::_internal_manager_request"};
89 constexpr goby::middleware::Group manager_response{"goby::zeromq::_internal_manager_response"};
90 } // namespace groups
91 
92 void setup_socket(zmq::socket_t& socket, const protobuf::Socket& cfg);
93 
95 {
99 };
100 
101 // scheme
102 inline std::string identifier_part_to_string(int i)
103 {
105 }
106 inline std::string identifier_part_to_string(std::thread::id i)
107 {
108  return goby::middleware::thread_id(i);
109 }
110 
112 template <typename Key>
113 const std::string& id_component(const Key& k, std::unordered_map<Key, std::string>& map)
114 {
115  auto it = map.find(k);
116  if (it != map.end())
117  return it->second;
118 
119  std::string v = identifier_part_to_string(k) + "/";
120  auto it_pair = map.insert(std::make_pair(k, v));
121  return it_pair.first->second;
122 }
123 
124 inline std::string
125 make_identifier(const std::string& type_name, int scheme, const std::string& group,
126  IdentifierWildcard wildcard, const std::string& process,
127  std::unordered_map<int, std::string>* schemes_buffer = nullptr,
128  std::unordered_map<std::thread::id, std::string>* threads_buffer = nullptr)
129 {
130  switch (wildcard)
131  {
132  default:
134  {
135  auto thread = std::this_thread::get_id();
136  return ("/" + group + "/" +
137  (schemes_buffer ? id_component(scheme, *schemes_buffer)
138  : std::string(identifier_part_to_string(scheme) + "/")) +
139  type_name + "/" + process + "/" +
140  (threads_buffer ? id_component(thread, *threads_buffer)
141  : std::string(identifier_part_to_string(thread) + "/")));
142  }
144  {
145  return ("/" + group + "/" +
146  (schemes_buffer ? id_component(scheme, *schemes_buffer)
147  : std::string(identifier_part_to_string(scheme) + "/")) +
148  type_name + "/" + process + "/");
149  }
151  {
152  return ("/" + group + "/" +
153  (schemes_buffer ? id_component(scheme, *schemes_buffer)
154  : std::string(identifier_part_to_string(scheme) + "/")) +
155  type_name + "/");
156  }
157  }
158 }
159 
160 #ifdef USE_OLD_ZMQ_CPP_API
163 #else
164 using zmq_recv_flags_type = zmq::recv_flags;
165 using zmq_send_flags_type = zmq::send_flags;
166 #endif
167 
168 // run in the same thread as InterProcessPortal
170 {
171  public:
172  InterProcessPortalMainThread(zmq::context_t& context);
174  {
175 #ifdef USE_OLD_CPPZMQ_SETSOCKOPT
176  control_socket_.setsockopt(ZMQ_LINGER, 0);
177  publish_socket_.setsockopt(ZMQ_LINGER, 0);
178 #else
179  control_socket_.set(zmq::sockopt::linger, 0);
180  publish_socket_.set(zmq::sockopt::linger, 0);
181 #endif
182  }
183 
184  bool publish_ready() { return !hold_; }
185  bool subscribe_ready() { return have_pubsub_sockets_; }
186 
187  bool recv(protobuf::InprocControl* control_msg,
189  void set_publish_cfg(const protobuf::Socket& cfg);
190 
191  void set_hold_state(bool hold);
192  bool hold_state() { return hold_; }
193 
194  void publish(const std::string& identifier, const char* bytes, int size,
195  bool ignore_buffer = false);
196  void subscribe(const std::string& identifier);
197  void unsubscribe(const std::string& identifier);
198  void reader_shutdown();
199 
200  std::deque<protobuf::InprocControl>& control_buffer() { return control_buffer_; }
201  void send_control_msg(const protobuf::InprocControl& control);
202 
203  private:
204  private:
205  zmq::socket_t control_socket_;
206  zmq::socket_t publish_socket_;
207  bool hold_{true};
208  bool have_pubsub_sockets_{false};
209 
210  std::deque<std::pair<std::string, std::vector<char>>>
211  publish_queue_; //used before hold == false
212 
213  // buffer messages while waiting for (un)subscribe ack
214  std::deque<protobuf::InprocControl> control_buffer_;
215 };
216 
217 // run in a separate thread to allow zmq_.poll() to block without interrupting the main thread
219 {
220  public:
222  zmq::context_t& context, std::atomic<bool>& alive,
223  std::shared_ptr<std::condition_variable_any> poller_cv);
224  void run();
226  {
227 #ifdef USE_OLD_CPPZMQ_SETSOCKOPT
228  control_socket_.setsockopt(ZMQ_LINGER, 0);
229  subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
230  manager_socket_.setsockopt(ZMQ_LINGER, 0);
231 #else
232  control_socket_.set(zmq::sockopt::linger, 0);
233  subscribe_socket_.set(zmq::sockopt::linger, 0);
234  manager_socket_.set(zmq::sockopt::linger, 0);
235 #endif
236  }
237 
238  private:
239  void poll(long timeout_ms = -1);
240  void control_data(const zmq::message_t& zmq_msg);
241  void subscribe_data(const zmq::message_t& zmq_msg);
242  void manager_data(const zmq::message_t& zmq_msg);
243  void send_control_msg(const protobuf::InprocControl& control);
244  void send_manager_request(const protobuf::ManagerRequest& req);
245 
246  private:
248  zmq::socket_t control_socket_;
249  zmq::socket_t subscribe_socket_;
250  zmq::socket_t manager_socket_;
251  std::atomic<bool>& alive_;
252  std::shared_ptr<std::condition_variable_any> poller_cv_;
253  std::vector<zmq::pollitem_t> poll_items_;
254  enum
255  {
256  SOCKET_CONTROL = 0,
257  SOCKET_MANAGER = 1,
258  SOCKET_SUBSCRIBE = 2
259  };
260  enum
261  {
262  NUMBER_SOCKETS = 3
263  };
264  bool have_pubsub_sockets_{false};
265  bool hold_{true};
266  bool manager_waiting_for_reply_{false};
267 
268  goby::time::SystemClock::time_point next_hold_state_request_time_{
270  const goby::time::SystemClock::duration hold_state_request_period_{
271  std::chrono::milliseconds(100)};
272 };
273 
274 template <typename InnerTransporter,
275  template <typename Derived, typename InnerTransporterType> class PortalBase>
277  : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase>,
278  InnerTransporter>
279 {
280  public:
282  InnerTransporter>;
283 
285  : cfg_(cfg),
286  zmq_context_(cfg.zeromq_number_io_threads()),
287  zmq_main_(zmq_context_),
288  zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
289  {
290  _init();
291  }
292 
293  InterProcessPortalImplementation(InnerTransporter& inner,
295  : Base(inner),
296  cfg_(cfg),
297  zmq_context_(cfg.zeromq_number_io_threads()),
298  zmq_main_(zmq_context_),
299  zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
300  {
301  _init();
302  }
303 
305  {
306  if (zmq_thread_)
307  {
308  zmq_main_.reader_shutdown();
309  zmq_thread_->join();
310  }
311  }
312 
314  void ready() { ready_ = true; }
315 
317  bool hold_state() { return zmq_main_.hold_state(); }
318 
319  friend Base;
320  friend typename Base::Base;
321 
322  private:
323  void _init()
324  {
326 
327  // start zmq read thread
328  zmq_thread_ = std::make_unique<std::thread>([this]() { zmq_read_thread_.run(); });
329 
330  while (!zmq_main_.subscribe_ready())
331  {
332  protobuf::InprocControl control_msg;
333  if (zmq_main_.recv(&control_msg))
334  {
335  switch (control_msg.type())
336  {
338  zmq_main_.set_publish_cfg(control_msg.publish_socket());
339  break;
340  default: break;
341  }
342  }
343  }
344 
345  //
346  // Handle hold state request/response using pub sub so that we ensure
347  // publishing and subscribe is completely functional before releasing the hold
348  //
349  _subscribe<protobuf::ManagerResponse, middleware::MarshallingScheme::PROTOBUF>(
350  [this](std::shared_ptr<const protobuf::ManagerResponse> response) {
351  goby::glog.is_debug3() && goby::glog << "Received ManagerResponse: "
352  << response->ShortDebugString() << std::endl;
353  if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
354  response->client_pid() == getpid() &&
355  response->client_name() == cfg_.client_name())
356  {
357  zmq_main_.set_hold_state(response->hold());
358  }
359 
360  // we're good to go now, so let's unsubscribe to this group
361  if (zmq_main_.publish_ready())
362  {
363  _unsubscribe<protobuf::ManagerResponse,
364  middleware::MarshallingScheme::PROTOBUF>(
365  groups::manager_response,
366  middleware::Subscriber<protobuf::ManagerResponse>());
367  }
368  },
370  }
371 
372  template <typename Data, int scheme>
373  void _publish(const Data& d, const goby::middleware::Group& group,
374  const middleware::Publisher<Data>& /*publisher*/, bool ignore_buffer = false)
375  {
378  _publish_serialized(type_name, scheme, bytes, group, ignore_buffer);
379  }
380 
381  void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
382  const goby::middleware::Group& group, bool ignore_buffer = false)
383  {
384  std::string identifier = _make_fully_qualified_identifier(type_name, scheme, group) + '\0';
385  zmq_main_.publish(identifier, &bytes[0], bytes.size(), ignore_buffer);
386  }
387 
388  template <typename Data, int scheme>
389  void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f,
391  const middleware::Subscriber<Data>& /*subscriber*/)
392  {
393  std::string identifier =
394  _make_identifier<Data, scheme>(group, IdentifierWildcard::PROCESS_THREAD_WILDCARD);
395 
396  auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
397  f, group,
398  middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
399  [=](const Data& /*d*/) { return group; }));
400 
401  if (forwarder_subscriptions_.count(identifier) == 0 &&
402  portal_subscriptions_.count(identifier) == 0)
403  zmq_main_.subscribe(identifier);
404  portal_subscriptions_.insert(std::make_pair(identifier, subscription));
405  }
406 
407  std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
408  std::function<void(const std::vector<unsigned char>&, int scheme, const std::string& type,
410  f,
411  const std::set<int>& schemes, const std::string& type_regex, const std::string& group_regex)
412  {
413  auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
414  f, schemes, type_regex, group_regex);
415  _subscribe_regex(new_sub);
416  return new_sub;
417  }
418 
419  template <typename Data, int scheme>
420  void _unsubscribe(
422  const middleware::Subscriber<Data>& /*subscriber*/ = middleware::Subscriber<Data>())
423  {
424  std::string identifier =
425  _make_identifier<Data, scheme>(group, IdentifierWildcard::PROCESS_THREAD_WILDCARD);
426 
427  portal_subscriptions_.erase(identifier);
428 
429  // If no forwarded subscriptions, do the actual unsubscribe
430  if (forwarder_subscriptions_.count(identifier) == 0)
431  zmq_main_.unsubscribe(identifier);
432  }
433 
434  void _unsubscribe_all(
435  const std::string& subscriber_id = identifier_part_to_string(std::this_thread::get_id()))
436  {
437  // portal unsubscribe
438  if (subscriber_id == identifier_part_to_string(std::this_thread::get_id()))
439  {
440  for (const auto& p : portal_subscriptions_)
441  {
442  const auto& identifier = p.first;
443  if (forwarder_subscriptions_.count(identifier) == 0)
444  zmq_main_.unsubscribe(identifier);
445  }
446  portal_subscriptions_.clear();
447  }
448  else // forwarder unsubscribe
449  {
450  while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
451  _forwarder_unsubscribe(
452  subscriber_id,
453  forwarder_subscription_identifiers_[subscriber_id].begin()->first);
454  }
455 
456  // regex
457  if (regex_subscriptions_.size() > 0)
458  {
459  regex_subscriptions_.erase(subscriber_id);
460  if (regex_subscriptions_.empty())
461  zmq_main_.unsubscribe("/");
462  }
463  }
464 
465  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
466  {
467  int items = 0;
468  protobuf::InprocControl new_control_msg;
469 
470 #ifdef USE_OLD_ZMQ_CPP_API
471  int flags = ZMQ_NOBLOCK;
472 #else
473  auto flags = zmq::recv_flags::dontwait;
474 #endif
475 
476  while (zmq_main_.recv(&new_control_msg, flags))
477  zmq_main_.control_buffer().push_back(new_control_msg);
478 
479  while (!zmq_main_.control_buffer().empty())
480  {
481  const auto& control_msg = zmq_main_.control_buffer().front();
482  switch (control_msg.type())
483  {
485  {
486  ++items;
487  if (lock)
488  lock.reset();
489 
490  const auto& data = control_msg.received_data();
491 
492  std::string group, type, thread;
493  int scheme, process;
494  std::tie(group, scheme, type, process, thread) = parse_identifier(data);
495  std::string identifier = _make_identifier(
497 
498  // build a set so if any of the handlers unsubscribes, we still have a pointer to the middleware::SerializationHandlerBase<>
499  std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>>
500  subs_to_post;
501  auto portal_range = portal_subscriptions_.equal_range(identifier);
502  for (auto it = portal_range.first; it != portal_range.second; ++it)
503  subs_to_post.push_back(it->second);
504  auto forwarder_it = forwarder_subscriptions_.find(identifier);
505  if (forwarder_it != forwarder_subscriptions_.end())
506  subs_to_post.push_back(forwarder_it->second);
507 
508  // actually post the data
509  {
510  const auto& data = control_msg.received_data();
511  auto null_delim_it = std::find(std::begin(data), std::end(data), '\0');
512  for (auto& sub : subs_to_post)
513  {
514  if (auto sub_sp = sub.lock())
515  sub_sp->post(null_delim_it + 1, data.end());
516  }
517  }
518 
519  if (!regex_subscriptions_.empty())
520  {
521  auto null_delim_it = std::find(std::begin(data), std::end(data), '\0');
522 
523  bool forwarder_subscription_posted = false;
524  for (auto& sub : regex_subscriptions_)
525  {
526  // only post at most once for forwarders as the threads will filter
527  bool is_forwarded_sub =
528  sub.first != identifier_part_to_string(std::this_thread::get_id());
529  if (is_forwarded_sub && forwarder_subscription_posted)
530  continue;
531 
532  if (sub.second->post(null_delim_it + 1, data.end(), scheme, type,
533  group) &&
534  is_forwarded_sub)
535  forwarder_subscription_posted = true;
536  }
537  }
538  }
539  break;
540 
542  {
543  protobuf::ManagerRequest req;
544 
545  req.set_ready(ready_);
546  req.set_request(protobuf::PROVIDE_HOLD_STATE);
547  req.set_client_name(cfg_.client_name());
548  req.set_client_pid(getpid());
549 
550  goby::glog.is_debug3() && goby::glog << "Published ManagerRequest: "
551  << req.ShortDebugString() << std::endl;
552 
553  _publish<protobuf::ManagerRequest, middleware::MarshallingScheme::PROTOBUF>(
555  middleware::Publisher<protobuf::ManagerRequest>(), true);
556  }
557  break;
558 
559  default: break;
560  }
561  zmq_main_.control_buffer().pop_front();
562  }
563  return items;
564  }
565 
566  void _receive_publication_forwarded(
568  {
569  std::string identifier =
570  _make_identifier(msg.key().type(), msg.key().marshalling_scheme(), msg.key().group(),
572  '\0';
573  auto& bytes = msg.data();
574  zmq_main_.publish(identifier, &bytes[0], bytes.size());
575  }
576 
577  void _receive_subscription_forwarded(
578  const std::shared_ptr<const middleware::SerializationHandlerBase<>>& subscription)
579  {
580  std::string identifier = _make_identifier(subscription->type_name(), subscription->scheme(),
581  subscription->subscribed_group(),
583 
584  goby::glog.is_debug2() &&
585  goby::glog << "Received subscription forwarded for identifier [" << identifier
586  << "] from subscriber id: " << subscription->subscriber_id() << std::endl;
587 
588  switch (subscription->action())
589  {
591  {
592  // insert if this thread hasn't already subscribed
593  if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
594  identifier) == 0)
595  {
596  // first to subscribe from a Forwarder
597  if (forwarder_subscriptions_.count(identifier) == 0)
598  {
599  // first to subscribe (locally or forwarded)
600  if (portal_subscriptions_.count(identifier) == 0)
601  zmq_main_.subscribe(identifier);
602 
603  // create Forwarder subscription
604  forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
605  }
606  forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
607  std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
608  }
609  }
610  break;
611 
613  {
614  _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
615  }
616  break;
617 
618  default: break;
619  }
620  }
621 
622  void _forwarder_unsubscribe(const std::string& subscriber_id, const std::string& identifier)
623  {
624  auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
625  if (it != forwarder_subscription_identifiers_[subscriber_id].end())
626  {
627  bool no_forwarder_subscribers = true;
628  for (const auto& p : forwarder_subscription_identifiers_)
629  {
630  if (p.second.count(identifier) != 0)
631  {
632  no_forwarder_subscribers = false;
633  break;
634  }
635  }
636 
637  // if no Forwarder subscriptions left
638  if (no_forwarder_subscribers)
639  {
640  // erase the Forwarder subscription
641  forwarder_subscriptions_.erase(it->second);
642 
643  // do the actual unsubscribe if we aren't subscribe locally as well
644  if (portal_subscriptions_.count(identifier) == 0)
645  zmq_main_.unsubscribe(identifier);
646  }
647 
648  forwarder_subscription_identifiers_[subscriber_id].erase(it);
649  }
650  }
651 
652  void _receive_regex_subscription_forwarded(
653  std::shared_ptr<const middleware::SerializationSubscriptionRegex> subscription)
654  {
655  _subscribe_regex(subscription);
656  }
657 
658  void _subscribe_regex(
659  const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
660  {
661  if (regex_subscriptions_.empty())
662  zmq_main_.subscribe("/");
663 
664  regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
665  }
666 
667  template <typename Data, int scheme>
668  std::string _make_identifier(const goby::middleware::Group& group, IdentifierWildcard wildcard)
669  {
671  scheme, group, wildcard);
672  }
673 
674  std::string _make_fully_qualified_identifier(const std::string& type_name, int scheme,
675  const std::string& group)
676  {
677  return _make_identifier(type_name, scheme, group, IdentifierWildcard::THREAD_WILDCARD) +
678  id_component(std::this_thread::get_id(), threads_);
679  }
680 
681  template <typename Data, int scheme>
682  std::string _make_identifier(const Data& d, const goby::middleware::Group& group,
683  IdentifierWildcard wildcard)
684  {
686  scheme, group, wildcard);
687  }
688 
689  std::string _make_identifier(const std::string& type_name, int scheme, const std::string& group,
690  IdentifierWildcard wildcard)
691  {
692  return make_identifier(type_name, scheme, group, wildcard, process_, &schemes_, &threads_);
693  }
694 
695  // group, scheme, type, process, thread
696  std::tuple<std::string, int, std::string, int, std::size_t>
697  parse_identifier(const std::string& identifier)
698  {
699  const int number_elements = 5;
700  std::string::size_type previous_slash = 0;
701  std::vector<std::string> elem;
702  for (auto i = 0; i < number_elements; ++i)
703  {
704  auto slash_pos = identifier.find('/', previous_slash + 1);
705  elem.push_back(identifier.substr(previous_slash + 1, slash_pos - (previous_slash + 1)));
706  previous_slash = slash_pos;
707  }
708  return std::make_tuple(elem[0], middleware::MarshallingScheme::from_string(elem[1]),
709  elem[2], std::stoi(elem[3]), std::stoull(elem[4], nullptr, 16));
710  }
711 
712  private:
713  const protobuf::InterProcessPortalConfig cfg_;
714 
715  std::unique_ptr<std::thread> zmq_thread_;
716  std::atomic<bool> zmq_alive_{true};
717  zmq::context_t zmq_context_;
718  InterProcessPortalMainThread zmq_main_;
719  InterProcessPortalReadThread zmq_read_thread_;
720 
721  // maps identifier to subscription
722  std::unordered_multimap<std::string,
723  std::shared_ptr<const middleware::SerializationHandlerBase<>>>
724  portal_subscriptions_;
725  // only one subscription for each forwarded identifier
726  std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
727  forwarder_subscriptions_;
728  std::unordered_map<
729  std::string, std::unordered_map<std::string, typename decltype(
730  forwarder_subscriptions_)::const_iterator>>
731  forwarder_subscription_identifiers_;
732 
733  std::unordered_multimap<std::string,
734  std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
735  regex_subscriptions_;
736  std::string process_{std::to_string(getpid())};
737  std::unordered_map<int, std::string> schemes_;
738  std::unordered_map<std::thread::id, std::string> threads_;
739 
740  bool ready_{false};
741 };
742 
743 class Router
744 {
745  public:
746  Router(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg)
747  : context_(context), cfg_(cfg)
748  {
749  }
750 
751  void run();
752  unsigned last_port(zmq::socket_t& socket);
753 
754  Router(Router&) = delete;
755  Router& operator=(Router&) = delete;
756 
757  public:
758  std::atomic<unsigned> pub_port{0};
759  std::atomic<unsigned> sub_port{0};
760 
761  private:
762  zmq::context_t& context_;
764 };
765 
766 class Manager
767 {
768  public:
769  Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
770  const Router& router);
771 
772  Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
773  const Router& router, const protobuf::InterProcessManagerHold& hold)
774  : Manager(context, cfg, router)
775  {
776  for (const auto& req_c : hold.required_client()) required_clients_.insert(req_c);
777  }
778 
779  void run();
780 
784 
785  bool hold_state();
786 
787  private:
788  std::set<std::string> reported_clients_;
789  std::set<std::string> required_clients_;
790 
791  zmq::context_t& context_;
793  const Router& router_;
794 
795  std::vector<zmq::pollitem_t> poll_items_;
796  enum
797  {
798  SOCKET_MANAGER = 0,
799  SOCKET_SUBSCRIBE = 1,
800  };
801  enum
802  {
803  NUMBER_SOCKETS = 2
804  };
805 
806  std::unique_ptr<zmq::socket_t> manager_socket_;
807  std::unique_ptr<zmq::socket_t> subscribe_socket_;
808  std::unique_ptr<zmq::socket_t> publish_socket_;
809 
810  std::string zmq_filter_req_{make_identifier(
811  middleware::SerializerParserHelper<
812  protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
813  middleware::scheme<protobuf::ManagerRequest>(), groups::manager_request,
815 
816  std::string zmq_filter_rep_{
817  make_identifier(middleware::SerializerParserHelper<
818  protobuf::ManagerResponse,
819  middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
820  middleware::scheme<protobuf::ManagerResponse>(), groups::manager_response,
822  std::string(1, '\0')};
823 }; // namespace zeromq
824 
825 template <typename InnerTransporter = middleware::NullTransporter>
826 using InterProcessPortal =
828 
829 } // namespace zeromq
830 } // namespace goby
831 
832 #endif
goby::zeromq::InterProcessPortalMainThread::set_hold_state
void set_hold_state(bool hold)
goby::time::SystemClock::now
static time_point now() noexcept
Returns the current system time unless SimulatorSettings::using_sim_time is set to true,...
Definition: system_clock.h:63
goby::zeromq::protobuf::InprocControl::publish_socket
const ::goby::zeromq::protobuf::Socket & publish_socket() const
Definition: interprocess_zeromq.pb.h:1896
goby::zeromq::InterProcessPortalMainThread::~InterProcessPortalMainThread
~InterProcessPortalMainThread()
Definition: interprocess.h:173
goby::zeromq::protobuf::InterProcessPortalConfig
Definition: interprocess_config.pb.h:97
goby::zeromq::Manager::subscribe_socket_cfg
protobuf::Socket subscribe_socket_cfg()
goby::zeromq::InterProcessPortalImplementation::Base
friend Base
Definition: interprocess.h:319
system_clock.h
goby::zeromq::InterProcessPortalMainThread::InterProcessPortalMainThread
InterProcessPortalMainThread(zmq::context_t &context)
goby::zeromq::protobuf::InterProcessManagerHold::required_client
const ::std::string & required_client(int index) const
Definition: interprocess_config.pb.h:1082
goby::zeromq::InterProcessPortalMainThread::set_publish_cfg
void set_publish_cfg(const protobuf::Socket &cfg)
goby::middleware::protobuf::SerializerTransporterMessage
Definition: serializer_transporter.pb.h:462
goby::zeromq::groups::manager_response
constexpr goby::middleware::Group manager_response
Definition: interprocess.h:89
goby::zeromq::InterProcessPortalMainThread::send_control_msg
void send_control_msg(const protobuf::InprocControl &control)
interface.h
goby::zeromq::protobuf::InterProcessPortalConfig::client_name
const ::std::string & client_name() const
Definition: interprocess_config.pb.h:1010
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::SerializerParserHelper
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:97
goby::zeromq::IdentifierWildcard::PROCESS_THREAD_WILDCARD
@ PROCESS_THREAD_WILDCARD
goby::zeromq::protobuf::InprocControl::REQUEST_HOLD_STATE
static const InprocControlType REQUEST_HOLD_STATE
Definition: interprocess_zeromq.pb.h:962
goby::zeromq::IdentifierWildcard::NO_WILDCARDS
@ NO_WILDCARDS
goby::zeromq::identifier_part_to_string
std::string identifier_part_to_string(int i)
Definition: interprocess.h:102
goby::middleware::SerializationHandlerBase::SubscriptionAction::SUBSCRIBE
@ SUBSCRIBE
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
goby::zeromq::protobuf::InprocControl::type
::goby::zeromq::protobuf::InprocControl_InprocControlType type() const
Definition: interprocess_zeromq.pb.h:1868
goby::zeromq::groups::manager_request
constexpr goby::middleware::Group manager_request
Definition: interprocess.h:88
goby::zeromq::Router::sub_port
std::atomic< unsigned > sub_port
Definition: interprocess.h:759
group.h
protobuf.h
goby::util::FlexOstream::is_debug3
bool is_debug3()
Definition: flex_ostream.h:86
goby::zeromq::Router::Router
Router(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg)
Definition: interprocess.h:746
goby::zeromq::InterProcessPortalMainThread
Definition: interprocess.h:169
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::zeromq::Manager::Manager
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router)
subscriber.h
flex_ostreambuf.h
goby::zeromq::Router::operator=
Router & operator=(Router &)=delete
goby::zeromq::InterProcessPortalReadThread::~InterProcessPortalReadThread
~InterProcessPortalReadThread()
Definition: interprocess.h:225
goby::zeromq::Manager::publish_socket_cfg
protobuf::Socket publish_socket_cfg()
goby::util::FlexOstream::is_debug2
bool is_debug2()
Definition: flex_ostream.h:85
goby::zeromq::InterProcessPortalMainThread::publish
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
goby::zeromq::InterProcessPortalReadThread
Definition: interprocess.h:218
goby::time::SystemClock::duration
std::chrono::microseconds duration
Duration type.
Definition: system_clock.h:52
transporter_config.pb.h
goby::middleware::protobuf::TransporterConfig
Definition: transporter_config.pb.h:74
goby::middleware::thread_id
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition: common.h:53
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
interprocess_config.pb.h
goby::zeromq::setup_socket
void setup_socket(zmq::socket_t &socket, const protobuf::Socket &cfg)
goby::zeromq::InterProcessPortal
InterProcessPortalImplementation< InnerTransporter, middleware::InterProcessPortalBase > InterProcessPortal
Definition: interprocess.h:827
goby::zeromq::protobuf::InprocControl::RECEIVE
static const InprocControlType RECEIVE
Definition: interprocess_zeromq.pb.h:958
goby::zeromq::id_component
const std::string & id_component(const Key &k, std::unordered_map< Key, std::string > &map)
Given key, find the string in the map, or create it (to_string) and store it, and return the string.
Definition: interprocess.h:113
goby::zeromq::Manager::handle_request
protobuf::ManagerResponse handle_request(const protobuf::ManagerRequest &pb_request)
goby::zeromq::InterProcessPortalMainThread::hold_state
bool hold_state()
Definition: interprocess.h:192
goby::zeromq::protobuf::InprocControl::PUB_CONFIGURATION
static const InprocControlType PUB_CONFIGURATION
Definition: interprocess_zeromq.pb.h:948
goby::zeromq::Router::last_port
unsigned last_port(zmq::socket_t &socket)
null.h
goby::middleware::MarshallingScheme::from_string
static int from_string(const std::string &s)
Convert from a string to a marshalling scheme id.
Definition: interface.h:77
goby::middleware::SerializerParserHelper::type_name
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition: interface.h:107
goby::zeromq::IdentifierWildcard::THREAD_WILDCARD
@ THREAD_WILDCARD
goby::zeromq::InterProcessPortalMainThread::recv
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())
goby::zeromq::InterProcessPortalImplementation::ready
void ready()
When using hold functionality, call when the process is ready to receive publications (typically done...
Definition: interprocess.h:314
goby::time::SystemClock::time_point
std::chrono::time_point< SystemClock > time_point
Definition: system_clock.h:55
goby::middleware::MarshallingScheme::to_string
static std::string to_string(int e)
Convert a known marshalling scheme to a human-readable string or an unknown scheme to the string repr...
Definition: interface.h:67
goby::zeromq::InterProcessPortalImplementation::~InterProcessPortalImplementation
~InterProcessPortalImplementation()
Definition: interprocess.h:304
goby::zeromq::InterProcessPortalMainThread::unsubscribe
void unsubscribe(const std::string &identifier)
goby::zeromq::protobuf::Socket
Definition: interprocess_zeromq.pb.h:348
socket_t
int socket_t
Definition: httplib.h:202
goby::zeromq::InterProcessPortalReadThread::run
void run()
to_string
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.hpp:24301
interprocess.h
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
flex_ostream.h
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::zeromq::InterProcessPortalReadThread::InterProcessPortalReadThread
InterProcessPortalReadThread(const protobuf::InterProcessPortalConfig &cfg, zmq::context_t &context, std::atomic< bool > &alive, std::shared_ptr< std::condition_variable_any > poller_cv)
goby::zeromq::IdentifierWildcard
IdentifierWildcard
Definition: interprocess.h:94
PortalBase
goby::zeromq::zmq_recv_flags_type
int zmq_recv_flags_type
Definition: interprocess.h:161
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::zeromq::Router
Definition: interprocess.h:743
goby::zeromq::protobuf::InprocControl
Definition: interprocess_zeromq.pb.h:855
goby::zeromq::InterProcessPortalImplementation::InterProcessPortalImplementation
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
Definition: interprocess.h:293
serializer_transporter.pb.h
goby::zeromq::make_identifier
std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
Definition: interprocess.h:125
goby::zeromq::Manager::Manager
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router, const protobuf::InterProcessManagerHold &hold)
Definition: interprocess.h:772
common.h
goby::zeromq::Manager::hold_state
bool hold_state()
goby::zeromq::InterProcessPortalMainThread::control_buffer
std::deque< protobuf::InprocControl > & control_buffer()
Definition: interprocess.h:200
goby::zeromq::InterProcessPortalImplementation::hold_state
bool hold_state()
When using hold functionality, returns whether the system is holding (true) and thus waiting for all ...
Definition: interprocess.h:317
interface.h
goby::zeromq::InterProcessPortalMainThread::subscribe
void subscribe(const std::string &identifier)
goby::zeromq::Manager::run
void run()
goby::middleware::scheme
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
goby::zeromq::InterProcessPortalImplementation::InterProcessPortalImplementation
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
Definition: interprocess.h:284
goby::zeromq::Router::pub_port
std::atomic< unsigned > pub_port
Definition: interprocess.h:758
goby::zeromq::protobuf::PROVIDE_HOLD_STATE
@ PROVIDE_HOLD_STATE
Definition: interprocess_zeromq.pb.h:174
goby::zeromq::InterProcessPortalMainThread::subscribe_ready
bool subscribe_ready()
Definition: interprocess.h:185
goby::zeromq::InterProcessPortalImplementation
Definition: interprocess.h:276
goby::zeromq::Manager
Definition: interprocess.h:766
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::middleware::SerializationHandlerBase::SubscriptionAction::UNSUBSCRIBE
@ UNSUBSCRIBE
goby::zeromq::protobuf::ManagerRequest
Definition: interprocess_zeromq.pb.h:193
serialization_handlers.h
goby::zeromq::protobuf::InterProcessManagerHold
Definition: interprocess_config.pb.h:397
goby::zeromq::protobuf::ManagerResponse
Definition: interprocess_zeromq.pb.h:670
goby::zeromq::Router::run
void run()
goby::zeromq::InterProcessPortalMainThread::reader_shutdown
void reader_shutdown()
goby::zeromq::InterProcessPortalMainThread::publish_ready
bool publish_ready()
Definition: interprocess.h:184
goby::util::FlexOstream::set_lock_action
void set_lock_action(logger_lock::LockAction lock_action)
Definition: flex_ostream.h:147
interprocess_zeromq.pb.h
int