Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
25#define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
26
28
29#include <atomic> // for atomic
30#include <chrono> // for mill...
31#include <condition_variable> // for cond...
32#include <deque> // for deque
33#include <functional> // for func...
34#include <memory> // for shar...
35#include <mutex> // for time...
36#include <set> // for set
37#include <string> // for string
38#include <thread> // for get_id
39#include <unordered_map> // for unor...
40#include <utility> // for make...
41#include <vector> // for vector
42
43#include <zmq.h> // for ZMQ_...
44#include <zmq.hpp> // for sock...
45
46#include "goby/middleware/common.h" // for thre...
47#include "goby/middleware/group.h" // for Group
48#include "goby/middleware/marshalling/interface.h" // for Seri...
51#include "goby/middleware/transport/interface.h" // for Poll...
52#include "goby/middleware/transport/interprocess.h" // for Inte...
53#include "goby/middleware/transport/null.h" // for Null...
55#include "goby/middleware/transport/subscriber.h" // for Subs...
56#include "goby/time/system_clock.h" // for Syst...
57#include "goby/util/debug_logger/flex_ostream.h" // for Flex...
58#include "goby/zeromq/transport/detail/tags.h" // for InterProcessTag
62
63#if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
64#define USE_OLD_ZMQ_CPP_API
65#endif
66
67#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
68#define USE_OLD_CPPZMQ_SETSOCKOPT
69#endif
70
71#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
72#define USE_OLD_CPPZMQ_POLL
73#endif
74
75namespace goby
76{
77namespace middleware
78{
79template <typename Data> class Publisher;
80} // namespace middleware
81
82namespace zeromq
83{
84namespace groups
85{
86constexpr goby::middleware::Group manager_request{"goby::zeromq::_internal_manager_request"};
87constexpr goby::middleware::Group manager_response{"goby::zeromq::_internal_manager_response"};
88} // namespace groups
89
90constexpr const char* delimiter_str{"/"};
91
92void setup_socket(zmq::socket_t& socket, const protobuf::Socket& cfg);
93
94#ifdef USE_OLD_ZMQ_CPP_API
97#else
98using zmq_recv_flags_type = zmq::recv_flags;
99using zmq_send_flags_type = zmq::send_flags;
100#endif
101
102// run in the same thread as InterProcessPortal
104{
105 public:
106 InterProcessPortalMainThread(zmq::context_t& context);
108 {
109#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
110 control_socket_.setsockopt(ZMQ_LINGER, 0);
111 publish_socket_.setsockopt(ZMQ_LINGER, 0);
112#else
113 control_socket_.set(zmq::sockopt::linger, 0);
114 publish_socket_.set(zmq::sockopt::linger, 0);
115#endif
116 }
117
118 bool publish_ready() { return !hold_; }
119 bool subscribe_ready() { return have_pubsub_sockets_; }
120
121 bool recv(protobuf::InprocControl* control_msg,
124
125 void set_hold_state(bool hold);
126 bool hold_state() { return hold_; }
127
128 void publish(const std::string& identifier, const char* bytes, int size,
129 bool ignore_buffer = false);
130 void subscribe(const std::string& identifier);
131 void unsubscribe(const std::string& identifier);
133
134 std::deque<protobuf::InprocControl>& control_buffer() { return control_buffer_; }
136
137 private:
138 private:
139 zmq::socket_t control_socket_;
140 zmq::socket_t publish_socket_;
141 bool hold_{true};
142 bool have_pubsub_sockets_{false};
143
144 std::deque<std::pair<std::string, std::vector<char>>>
145 publish_queue_; //used before hold == false
146
147 // buffer messages while waiting for (un)subscribe ack
148 std::deque<protobuf::InprocControl> control_buffer_;
149};
150
151// run in a separate thread to allow zmq_.poll() to block without interrupting the main thread
153{
154 public:
156 zmq::context_t& context, std::atomic<bool>& alive,
157 std::shared_ptr<std::condition_variable> poller_cv);
158 void run();
160 {
161#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
162 control_socket_.setsockopt(ZMQ_LINGER, 0);
163 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
164 manager_socket_.setsockopt(ZMQ_LINGER, 0);
165#else
166 control_socket_.set(zmq::sockopt::linger, 0);
167 subscribe_socket_.set(zmq::sockopt::linger, 0);
168 manager_socket_.set(zmq::sockopt::linger, 0);
169#endif
170 }
171
172 private:
173 void poll(long timeout_ms = -1);
174 void control_data(const zmq::message_t& zmq_msg);
175 void subscribe_data(const zmq::message_t& zmq_msg);
176 void manager_data(const zmq::message_t& zmq_msg);
177 void send_control_msg(const protobuf::InprocControl& control);
178 void send_manager_request(const protobuf::ManagerRequest& req);
179
180 private:
182 zmq::socket_t control_socket_;
183 zmq::socket_t subscribe_socket_;
184 zmq::socket_t manager_socket_;
185 std::atomic<bool>& alive_;
186 std::shared_ptr<std::condition_variable> poller_cv_;
187 std::vector<zmq::pollitem_t> poll_items_;
188 enum
189 {
190 SOCKET_CONTROL = 0,
191 SOCKET_MANAGER = 1,
192 SOCKET_SUBSCRIBE = 2
193 };
194 enum
195 {
196 NUMBER_SOCKETS = 3
197 };
198 bool have_pubsub_sockets_{false};
199 bool hold_{true};
200 bool manager_waiting_for_reply_{false};
201
202 goby::time::SystemClock::time_point next_hold_state_request_time_{
204 const goby::time::SystemClock::duration hold_state_request_period_{
205 std::chrono::milliseconds(100)};
206};
207
208template <typename InnerTransporter,
209 template <typename Derived, typename InnerTransporterType,
210 typename ImplementationTag_> class PortalBase,
211 typename ImplementationTag>
213 : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
214 ImplementationTag>,
215 InnerTransporter, ImplementationTag>
216{
217 public:
219 ImplementationTag>,
220 InnerTransporter, ImplementationTag>;
222
224 : cfg_(cfg),
225 zmq_context_(cfg.zeromq_number_io_threads()),
226 zmq_main_(zmq_context_),
227 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
228 {
229 _init();
230 }
231
232 InterProcessPortalImplementation(InnerTransporter& inner,
234 : Base(inner),
235 cfg_(cfg),
236 zmq_context_(cfg.zeromq_number_io_threads()),
237 zmq_main_(zmq_context_),
238 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
239 {
240 _init();
241 }
242
244 {
245 if (zmq_thread_)
246 {
247 zmq_main_.reader_shutdown();
248 zmq_thread_->join();
249 }
250 }
251
253 void ready() { ready_ = true; }
254
256 bool hold_state() { return zmq_main_.hold_state(); }
257
258 friend Base;
259 friend typename Base::Base;
260 friend typename Base::Common;
261
262 private:
263 void _init()
264 {
266
267 // start zmq read thread
268 zmq_thread_ = std::make_unique<std::thread>([this]() { zmq_read_thread_.run(); });
269
270 while (!zmq_main_.subscribe_ready())
271 {
272 protobuf::InprocControl control_msg;
273 if (zmq_main_.recv(&control_msg))
274 {
275 switch (control_msg.type())
276 {
277 case protobuf::InprocControl::PUB_CONFIGURATION:
278 zmq_main_.set_publish_cfg(control_msg.publish_socket());
279 break;
280 default: break;
281 }
282 }
283 }
284
285 //
286 // Handle hold state request/response using pub sub so that we ensure
287 // publishing and subscribe is completely functional before releasing the hold
288 //
289 this->template _subscribe<protobuf::ManagerResponse,
290 middleware::MarshallingScheme::PROTOBUF>(
291 [this](std::shared_ptr<const protobuf::ManagerResponse> response)
292 {
293 goby::glog.is_debug3() && goby::glog << "Received ManagerResponse: "
294 << response->ShortDebugString() << std::endl;
295 if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
296 response->client_pid() == getpid() &&
297 response->client_name() == cfg_.client_name())
298 {
299 zmq_main_.set_hold_state(response->hold());
300 }
301
302 // we're good to go now, so let's unsubscribe to this group
303 if (zmq_main_.publish_ready())
304 {
305 this->template _unsubscribe<protobuf::ManagerResponse,
306 middleware::MarshallingScheme::PROTOBUF>(
307 groups::manager_response,
309 }
310 },
311 groups::manager_response, middleware::Subscriber<protobuf::ManagerResponse>());
312 }
313
314 void _do_publish(const std::string& identifier, const std::vector<char>& bytes)
315 {
316 zmq_main_.publish(identifier, &bytes[0], bytes.size(), ignore_buffer_);
317 }
318
319 void _do_portal_subscribe(const std::string& identifier) { zmq_main_.subscribe(identifier); }
320 void _do_portal_unsubscribe(const std::string& identifier)
321 {
322 zmq_main_.unsubscribe(identifier);
323 }
324
325 void _do_portal_wildcard_subscribe() { zmq_main_.subscribe(delimiter_str); }
326 void _do_portal_wildcard_unsubscribe() { zmq_main_.unsubscribe(delimiter_str); }
327
328 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
329 {
330 int items = 0;
331 protobuf::InprocControl new_control_msg;
332
333#ifdef USE_OLD_ZMQ_CPP_API
334 int flags = ZMQ_NOBLOCK;
335#else
336 auto flags = zmq::recv_flags::dontwait;
337#endif
338
339 while (zmq_main_.recv(&new_control_msg, flags))
340 zmq_main_.control_buffer().push_back(new_control_msg);
341
342 while (!zmq_main_.control_buffer().empty())
343 {
344 const auto& control_msg = zmq_main_.control_buffer().front();
345 switch (control_msg.type())
346 {
347 case protobuf::InprocControl::RECEIVE:
348 {
349 ++items;
350 this->_handle_received_data(lock, control_msg.received_data());
351 }
352 break;
353
354 case protobuf::InprocControl::REQUEST_HOLD_STATE:
355 {
356 protobuf::ManagerRequest req;
357
358 req.set_ready(ready_);
359 req.set_request(protobuf::PROVIDE_HOLD_STATE);
360 req.set_client_name(cfg_.client_name());
361 req.set_client_pid(getpid());
362
363 goby::glog.is_debug3() && goby::glog << "Published ManagerRequest: "
364 << req.ShortDebugString() << std::endl;
365
366 ignore_buffer_ = true;
367 this->template publish<groups::manager_request>(req);
368 ignore_buffer_ = false;
369 }
370 break;
371
372 default: break;
373 }
374 zmq_main_.control_buffer().pop_front();
375 }
376 return items;
377 }
378
379 private:
380 const protobuf::InterProcessPortalConfig cfg_;
381
382 std::unique_ptr<std::thread> zmq_thread_;
383 std::atomic<bool> zmq_alive_{true};
384 zmq::context_t zmq_context_;
385 InterProcessPortalMainThread zmq_main_;
386 InterProcessPortalReadThread zmq_read_thread_;
387
388 bool ready_{false};
389 bool ignore_buffer_{false};
390};
391
393{
394 public:
395 Router(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg)
396 : context_(context), cfg_(cfg)
397 {
398 }
399
400 void run();
401 unsigned last_port(zmq::socket_t& socket);
402
403 Router(Router&) = delete;
405
406 public:
407 std::atomic<unsigned> pub_port{0};
408 std::atomic<unsigned> sub_port{0};
409
410 private:
411 zmq::context_t& context_;
413};
414
416{
417 public:
418 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
419 const Router& router);
420
421 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
422 const Router& router, const protobuf::InterProcessManagerHold& hold)
423 : Manager(context, cfg, router)
424 {
425 for (const auto& req_c : hold.required_client()) required_clients_.insert(req_c);
426 }
427
428 void run();
429
433
435
436 private:
437 std::set<std::string> reported_clients_;
438 std::set<std::string> required_clients_;
439
440 zmq::context_t& context_;
442 const Router& router_;
443
444 std::vector<zmq::pollitem_t> poll_items_;
445 enum
446 {
447 SOCKET_MANAGER = 0,
448 SOCKET_SUBSCRIBE = 1,
449 };
450 enum
451 {
452 NUMBER_SOCKETS = 2
453 };
454
455 std::unique_ptr<zmq::socket_t> manager_socket_;
456 std::unique_ptr<zmq::socket_t> subscribe_socket_;
457 std::unique_ptr<zmq::socket_t> publish_socket_;
458
461 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
462 middleware::scheme<protobuf::ManagerRequest>(), groups::manager_request,
464
465 std::string zmq_filter_rep_{
467 middleware::SerializerParserHelper<
468 protobuf::ManagerResponse,
469 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
470 middleware::scheme<protobuf::ManagerResponse>(), groups::manager_response,
471 middleware::IdentifierWildcard::NO_WILDCARDS, std::to_string(getpid())) +
473}; // namespace zeromq
474
475template <typename InnerTransporter = middleware::NullTransporter>
479
480template <typename InnerTransporter = middleware::NullTransporter>
483
484} // namespace zeromq
485} // namespace goby
486
487#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Implements the forwarder concept for the interprocess layer.
static std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
void set_lock_action(logger_lock::LockAction lock_action)
bool hold_state()
When using hold functionality, returns whether the system is holding (true) and thus waiting for all ...
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
void ready()
When using hold functionality, call when the process is ready to receive publications (typically done...
void set_publish_cfg(const protobuf::Socket &cfg)
void unsubscribe(const std::string &identifier)
std::deque< protobuf::InprocControl > & control_buffer()
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())
InterProcessPortalReadThread(const protobuf::InterProcessPortalConfig &cfg, zmq::context_t &context, std::atomic< bool > &alive, std::shared_ptr< std::condition_variable > poller_cv)
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router, const protobuf::InterProcessManagerHold &hold)
protobuf::Socket subscribe_socket_cfg()
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router)
protobuf::Socket publish_socket_cfg()
protobuf::ManagerResponse handle_request(const protobuf::ManagerRequest &pb_request)
Router & operator=(Router &)=delete
Router(Router &)=delete
unsigned last_port(zmq::socket_t &socket)
Router(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg)
::goby::zeromq::protobuf::InprocControl_InprocControlType type() const
const ::goby::zeromq::protobuf::Socket & publish_socket() const
const std::string & required_client(int index) const
constexpr goby::middleware::Group manager_response
constexpr goby::middleware::Group manager_request
constexpr const char * delimiter_str
int zmq_send_flags_type
int zmq_recv_flags_type
void setup_socket(zmq::socket_t &socket, const protobuf::Socket &cfg)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
std::chrono::time_point< SystemClock > time_point
static time_point now() noexcept
Returns the current system time unless SimulatorSettings::using_sim_time is set to true,...
std::chrono::microseconds duration
Duration type.
ImplementationTag for zeromq interprocess transporters.
Definition tags.h:36