Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
26#define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
27
29
30#include <atomic> // for atomic
31#include <chrono> // for mill...
32#include <condition_variable> // for cond...
33#include <deque> // for deque
34#include <functional> // for func...
35#include <memory> // for shar...
36#include <mutex> // for time...
37#include <set> // for set
38#include <string> // for string
39#include <thread> // for get_id
40#include <unordered_map> // for unor...
41#include <utility> // for make...
42#include <vector> // for vector
43
44#include <zmq.h> // for ZMQ_...
45#include <zmq.hpp> // for sock...
46
47#include "goby/middleware/common.h" // for thre...
48#include "goby/middleware/group.h" // for Group
49#include "goby/middleware/marshalling/interface.h" // for Seri...
52#include "goby/middleware/transport/interface.h" // for Poll...
53#include "goby/middleware/transport/interprocess.h" // for Inte...
54#include "goby/middleware/transport/null.h" // for Null...
56#include "goby/middleware/transport/subscriber.h" // for Subs...
57#include "goby/time/system_clock.h" // for Syst...
58#include "goby/util/debug_logger/flex_ostream.h" // for Flex...
59#include "goby/zeromq/transport/detail/tags.h" // for InterProcessTag
63
64#if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
65#define USE_OLD_ZMQ_CPP_API
66#endif
67
68#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
69#define USE_OLD_CPPZMQ_SETSOCKOPT
70#endif
71
72#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
73#define USE_OLD_CPPZMQ_POLL
74#endif
75
76namespace goby
77{
78namespace middleware
79{
80template <typename Data> class Publisher;
81} // namespace middleware
82
83namespace zeromq
84{
85namespace groups
86{
87constexpr goby::middleware::Group manager_request{"goby::zeromq::_internal_manager_request"};
88constexpr goby::middleware::Group manager_response{"goby::zeromq::_internal_manager_response"};
89} // namespace groups
90
91constexpr const char* delimiter_str{"/"};
92
93void setup_socket(zmq::socket_t& socket, const protobuf::Socket& cfg);
94
95#ifdef USE_OLD_ZMQ_CPP_API
98#else
99using zmq_recv_flags_type = zmq::recv_flags;
100using zmq_send_flags_type = zmq::send_flags;
101#endif
102
103// run in the same thread as InterProcessPortal
105{
106 public:
107 InterProcessPortalMainThread(zmq::context_t& context);
109 {
110#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
111 control_socket_.setsockopt(ZMQ_LINGER, 0);
112 publish_socket_.setsockopt(ZMQ_LINGER, 0);
113#else
114 control_socket_.set(zmq::sockopt::linger, 0);
115 publish_socket_.set(zmq::sockopt::linger, 0);
116#endif
117 }
118
119 bool publish_ready() { return !hold_; }
120 bool subscribe_ready() { return have_pubsub_sockets_; }
121
122 bool recv(protobuf::InprocControl* control_msg,
125
126 void set_hold_state(bool hold);
127 bool hold_state() { return hold_; }
128
129 void publish(const std::string& identifier, const char* bytes, int size,
130 bool ignore_buffer = false);
131 void subscribe(const std::string& identifier);
132 void unsubscribe(const std::string& identifier);
134
135 std::deque<protobuf::InprocControl>& control_buffer() { return control_buffer_; }
137
138 private:
139 private:
140 zmq::socket_t control_socket_;
141 zmq::socket_t publish_socket_;
142 bool hold_{true};
143 bool have_pubsub_sockets_{false};
144
145 std::deque<std::pair<std::string, std::vector<char>>>
146 publish_queue_; //used before hold == false
147
148 // buffer messages while waiting for (un)subscribe ack
149 std::deque<protobuf::InprocControl> control_buffer_;
150};
151
152// run in a separate thread to allow zmq_.poll() to block without interrupting the main thread
154{
155 public:
157 zmq::context_t& context, std::atomic<bool>& alive,
158 std::shared_ptr<std::condition_variable> poller_cv);
159 void run();
161 {
162#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
163 control_socket_.setsockopt(ZMQ_LINGER, 0);
164 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
165 manager_socket_.setsockopt(ZMQ_LINGER, 0);
166#else
167 control_socket_.set(zmq::sockopt::linger, 0);
168 subscribe_socket_.set(zmq::sockopt::linger, 0);
169 manager_socket_.set(zmq::sockopt::linger, 0);
170#endif
171 }
172
173 private:
174 void poll(long timeout_ms = -1);
175 void control_data(const zmq::message_t& zmq_msg);
176 void subscribe_data(const zmq::message_t& zmq_msg);
177 void manager_data(const zmq::message_t& zmq_msg);
178 void send_control_msg(const protobuf::InprocControl& control);
179 void send_manager_request(const protobuf::ManagerRequest& req);
180
181 private:
183 zmq::socket_t control_socket_;
184 zmq::socket_t subscribe_socket_;
185 zmq::socket_t manager_socket_;
186 std::atomic<bool>& alive_;
187 std::shared_ptr<std::condition_variable> poller_cv_;
188 std::vector<zmq::pollitem_t> poll_items_;
189 enum
190 {
191 SOCKET_CONTROL = 0,
192 SOCKET_MANAGER = 1,
193 SOCKET_SUBSCRIBE = 2
194 };
195 enum
196 {
197 NUMBER_SOCKETS = 3
198 };
199 bool have_pubsub_sockets_{false};
200 bool hold_{true};
201 bool manager_waiting_for_reply_{false};
202
203 goby::time::SystemClock::time_point next_hold_state_request_time_{
205 const goby::time::SystemClock::duration hold_state_request_period_{
206 std::chrono::milliseconds(100)};
207};
208
209template <typename InnerTransporter,
210 template <typename Derived, typename InnerTransporterType,
211 typename ImplementationTag_> class PortalBase,
212 typename ImplementationTag>
214 : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
215 ImplementationTag>,
216 InnerTransporter, ImplementationTag>
217{
218 public:
220 ImplementationTag>,
221 InnerTransporter, ImplementationTag>;
223
225 : cfg_(cfg),
226 zmq_context_(cfg.zeromq_number_io_threads()),
227 zmq_main_(zmq_context_),
228 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
229 {
230 _init();
231 }
232
233 InterProcessPortalImplementation(InnerTransporter& inner,
235 : Base(inner),
236 cfg_(cfg),
237 zmq_context_(cfg.zeromq_number_io_threads()),
238 zmq_main_(zmq_context_),
239 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
240 {
241 _init();
242 }
243
245 {
246 if (zmq_thread_)
247 {
248 zmq_main_.reader_shutdown();
249 zmq_thread_->join();
250 }
251 }
252
254 void ready() { ready_ = true; }
255
257 bool hold_state() { return zmq_main_.hold_state(); }
258
259 friend Base;
260 friend typename Base::Base;
261 friend typename Base::Common;
262
263 private:
264 void _init()
265 {
267
268 // start zmq read thread
269 zmq_thread_ = std::make_unique<std::thread>([this]() { zmq_read_thread_.run(); });
270
271 while (!zmq_main_.subscribe_ready())
272 {
273 protobuf::InprocControl control_msg;
274 if (zmq_main_.recv(&control_msg))
275 {
276 switch (control_msg.type())
277 {
278 case protobuf::InprocControl::PUB_CONFIGURATION:
279 zmq_main_.set_publish_cfg(control_msg.publish_socket());
280 break;
281 default: break;
282 }
283 }
284 }
285
286 //
287 // Handle hold state request/response using pub sub so that we ensure
288 // publishing and subscribe is completely functional before releasing the hold
289 //
290 this->template _subscribe<protobuf::ManagerResponse,
291 middleware::MarshallingScheme::PROTOBUF>(
292 [this](std::shared_ptr<const protobuf::ManagerResponse> response)
293 {
294 goby::glog.is_debug3() && goby::glog << "Received ManagerResponse: "
295 << response->ShortDebugString() << std::endl;
296 if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
297 response->client_pid() == getpid() &&
298 response->client_name() == cfg_.client_name())
299 {
300 zmq_main_.set_hold_state(response->hold());
301 }
302
303 // we're good to go now, so let's unsubscribe to this group
304 if (zmq_main_.publish_ready())
305 {
306 this->template _unsubscribe<protobuf::ManagerResponse,
307 middleware::MarshallingScheme::PROTOBUF>(
308 groups::manager_response,
310 }
311 },
312 groups::manager_response, middleware::Subscriber<protobuf::ManagerResponse>());
313 }
314
315 void _do_publish(const std::string& identifier, const std::vector<char>& bytes)
316 {
317 zmq_main_.publish(identifier, bytes.data(), bytes.size(), ignore_buffer_);
318 }
319
320 void _do_portal_subscribe(const std::string& identifier) { zmq_main_.subscribe(identifier); }
321 void _do_portal_unsubscribe(const std::string& identifier)
322 {
323 zmq_main_.unsubscribe(identifier);
324 }
325
326 void _do_portal_wildcard_subscribe() { zmq_main_.subscribe(delimiter_str); }
327 void _do_portal_wildcard_unsubscribe() { zmq_main_.unsubscribe(delimiter_str); }
328
329 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
330 {
331 int items = 0;
332 protobuf::InprocControl new_control_msg;
333
334#ifdef USE_OLD_ZMQ_CPP_API
335 int flags = ZMQ_NOBLOCK;
336#else
337 auto flags = zmq::recv_flags::dontwait;
338#endif
339
340 while (zmq_main_.recv(&new_control_msg, flags))
341 zmq_main_.control_buffer().push_back(new_control_msg);
342
343 while (!zmq_main_.control_buffer().empty())
344 {
345 const auto& control_msg = zmq_main_.control_buffer().front();
346 switch (control_msg.type())
347 {
348 case protobuf::InprocControl::RECEIVE:
349 {
350 ++items;
351 this->_handle_received_data(lock, control_msg.received_data());
352 }
353 break;
354
355 case protobuf::InprocControl::REQUEST_HOLD_STATE:
356 {
357 protobuf::ManagerRequest req;
358
359 req.set_ready(ready_);
360 req.set_request(protobuf::PROVIDE_HOLD_STATE);
361 req.set_client_name(cfg_.client_name());
362 req.set_client_pid(getpid());
363
364 goby::glog.is_debug3() && goby::glog << "Published ManagerRequest: "
365 << req.ShortDebugString() << std::endl;
366
367 ignore_buffer_ = true;
368 this->template publish<groups::manager_request>(req);
369 ignore_buffer_ = false;
370 }
371 break;
372
373 default: break;
374 }
375 zmq_main_.control_buffer().pop_front();
376 }
377 return items;
378 }
379
380 private:
381 const protobuf::InterProcessPortalConfig cfg_;
382
383 std::unique_ptr<std::thread> zmq_thread_;
384 std::atomic<bool> zmq_alive_{true};
385 zmq::context_t zmq_context_;
386 InterProcessPortalMainThread zmq_main_;
387 InterProcessPortalReadThread zmq_read_thread_;
388
389 bool ready_{false};
390 bool ignore_buffer_{false};
391};
392
394{
395 public:
396 Router(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg)
397 : context_(context), cfg_(cfg)
398 {
399 }
400
401 void run();
402 unsigned last_port(zmq::socket_t& socket);
403
404 Router(Router&) = delete;
406
407 public:
408 std::atomic<unsigned> pub_port{0};
409 std::atomic<unsigned> sub_port{0};
410
411 private:
412 zmq::context_t& context_;
414};
415
417{
418 public:
419 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
420 const Router& router);
421
422 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
423 const Router& router, const protobuf::InterProcessManagerHold& hold)
424 : Manager(context, cfg, router)
425 {
426 for (const auto& req_c : hold.required_client()) required_clients_.insert(req_c);
427 }
428
429 void run();
430
434
436
437 private:
438 std::set<std::string> reported_clients_;
439 std::set<std::string> required_clients_;
440
441 zmq::context_t& context_;
443 const Router& router_;
444
445 std::vector<zmq::pollitem_t> poll_items_;
446 enum
447 {
448 SOCKET_MANAGER = 0,
449 SOCKET_SUBSCRIBE = 1,
450 };
451 enum
452 {
453 NUMBER_SOCKETS = 2
454 };
455
456 std::unique_ptr<zmq::socket_t> manager_socket_;
457 std::unique_ptr<zmq::socket_t> subscribe_socket_;
458 std::unique_ptr<zmq::socket_t> publish_socket_;
459
462 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
463 middleware::scheme<protobuf::ManagerRequest>(), groups::manager_request,
465
466 std::string zmq_filter_rep_{
468 middleware::SerializerParserHelper<
469 protobuf::ManagerResponse,
470 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
471 middleware::scheme<protobuf::ManagerResponse>(), groups::manager_response,
472 middleware::IdentifierWildcard::NO_WILDCARDS, std::to_string(getpid())) +
474}; // namespace zeromq
475
476template <typename InnerTransporter = middleware::NullTransporter>
480
481template <typename InnerTransporter = middleware::NullTransporter>
484
485} // namespace zeromq
486} // namespace goby
487
488#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
Implements the forwarder concept for the interprocess layer.
static std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
void set_lock_action(logger_lock::LockAction lock_action)
bool hold_state()
When using hold functionality, returns whether the system is holding (true) and thus waiting for all ...
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
void ready()
When using hold functionality, call when the process is ready to receive publications (typically done...
void set_publish_cfg(const protobuf::Socket &cfg)
void unsubscribe(const std::string &identifier)
std::deque< protobuf::InprocControl > & control_buffer()
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())
InterProcessPortalReadThread(const protobuf::InterProcessPortalConfig &cfg, zmq::context_t &context, std::atomic< bool > &alive, std::shared_ptr< std::condition_variable > poller_cv)
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router, const protobuf::InterProcessManagerHold &hold)
protobuf::Socket subscribe_socket_cfg()
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router)
protobuf::Socket publish_socket_cfg()
protobuf::ManagerResponse handle_request(const protobuf::ManagerRequest &pb_request)
Router & operator=(Router &)=delete
Router(Router &)=delete
unsigned last_port(zmq::socket_t &socket)
Router(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg)
::goby::zeromq::protobuf::InprocControl_InprocControlType type() const
const ::goby::zeromq::protobuf::Socket & publish_socket() const
const std::string & required_client(int index) const
constexpr goby::middleware::Group manager_response
constexpr goby::middleware::Group manager_request
constexpr const char * delimiter_str
int zmq_send_flags_type
int zmq_recv_flags_type
void setup_socket(zmq::socket_t &socket, const protobuf::Socket &cfg)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
std::chrono::time_point< SystemClock > time_point
static time_point now() noexcept
Returns the current system time unless SimulatorSettings::using_sim_time is set to true,...
std::chrono::microseconds duration
Duration type.
ImplementationTag for zeromq interprocess transporters.
Definition tags.h:36