Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_UDPM_TRANSPORT_INTERPROCESS_H
25#define GOBY_UDPM_TRANSPORT_INTERPROCESS_H
26
27#include <arpa/inet.h>
28#include <array>
29#include <boost/asio/buffer.hpp>
30#include <boost/asio/executor_work_guard.hpp>
31#include <boost/asio/ip/multicast.hpp>
32#include <boost/asio/ip/udp.hpp>
33#include <boost/asio/post.hpp>
34#include <boost/asio/socket_base.hpp>
35#include <boost/asio/steady_timer.hpp>
36#include <boost/circular_buffer.hpp>
37#include <chrono>
38#include <cstring>
39#include <deque>
40#include <memory>
41#include <mutex>
42#include <set>
43#include <thread>
44#include <unordered_map>
45
50
53
54namespace goby
55{
56namespace middleware
57{
58template <typename Data> class Publisher;
59} // namespace middleware
60
61namespace udpm
62{
63
64// Packet status codes
65enum class UDPMPacketStatus : uint8_t
66{
67 NORMAL = 0,
68};
69
70// Packet header (9 bytes, after the null-terminated identifier):
71// message_index : uint32 (network byte order)
72// num_packets : uint16 (network byte order)
73// packet_count : uint16 (network byte order)
74// status : uint8
75static constexpr std::size_t UDPM_PACKET_HEADER_SIZE =
76 sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint8_t);
77
85
86inline void encode_header(char* buf, const UDPMPacketHeader& h)
87{
88 uint32_t mi = htonl(h.message_index);
89 uint16_t np = htons(h.num_packets);
90 uint16_t pc = htons(h.packet_count);
91 std::memcpy(buf, &mi, 4);
92 std::memcpy(buf + 4, &np, 2);
93 std::memcpy(buf + 6, &pc, 2);
94 buf[8] = static_cast<uint8_t>(h.status);
95}
96
97inline UDPMPacketHeader decode_header(const char* buf)
98{
100 uint32_t mi;
101 uint16_t np, pc;
102 std::memcpy(&mi, buf, 4);
103 std::memcpy(&np, buf + 4, 2);
104 std::memcpy(&pc, buf + 6, 2);
105 h.message_index = ntohl(mi);
106 h.num_packets = ntohs(np);
107 h.packet_count = ntohs(pc);
108 h.status = static_cast<UDPMPacketStatus>(static_cast<uint8_t>(buf[8]));
109 return h;
110}
111
112template <typename InnerTransporter,
113 template <typename Derived, typename InnerTransporterType,
114 typename ImplementationTag_> class PortalBase,
115 typename ImplementationTag>
117 : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
118 ImplementationTag>,
119 InnerTransporter, ImplementationTag>
120{
121 public:
123 ImplementationTag>,
124 InnerTransporter, ImplementationTag>;
125
127
129 {
130 _init();
131 }
132
133 InterProcessPortalImplementation(InnerTransporter& inner,
135 : Base(inner), cfg_(cfg)
136 {
137 _init();
138 }
139
141 {
142 // Close socket first to cancel any pending async operations,
143 // then release the work guard so io_.ctx.run() can return, then join.
144 io_.socket.close();
145 io_.work.reset();
146 if (io_thread_.joinable())
147 io_thread_.join();
148 }
149
150 // no-op - no hold implemented in UDPM
151 void ready() {}
152 bool hold_state() { return false; }
153
154 friend Base;
155 friend typename Base::Base;
156 friend typename Base::Common;
157
158 private:
159 // -----------------------------------------------------------------------
160 // Called from main thread before io_thread_ starts; sets up socket and
161 // launches io_thread_.
162 // -----------------------------------------------------------------------
163 void _init()
164 {
166
167 boost::asio::ip::address listen_address =
168 boost::asio::ip::make_address(cfg_.listen_address());
169 boost::asio::ip::address multicast_address =
170 boost::asio::ip::make_address(cfg_.multicast_address());
171 short multicast_port = static_cast<short>(cfg_.multicast_port());
172
173 boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
174 io_.transmit_endpoint = boost::asio::ip::udp::endpoint(multicast_address, multicast_port);
175
176 io_.socket.open(listen_endpoint.protocol());
177 io_.socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
178 io_.socket.bind(listen_endpoint);
179 io_.socket.set_option(boost::asio::ip::multicast::join_group(multicast_address));
180
181 // Arm the first async receive before starting io_thread_ so the
182 // operation is already queued when io_.ctx.run() is called.
183 _start_async_receive();
184 io_thread_ = std::thread([this]() { io_.ctx.run(); });
185 }
186
187 // -----------------------------------------------------------------------
188 // Runs on io_thread_: arms (or re-arms) a single async datagram receive.
189 // Called once from _init() (main thread, before io_thread_ starts) and
190 // then re-posted from within the completion handler on io_thread_.
191 // -----------------------------------------------------------------------
192 void _start_async_receive()
193 {
194 io_.socket.async_receive_from(
195 boost::asio::buffer(io_.rx_buffer), io_.sender_endpoint,
196 [this](boost::system::error_code ec, std::size_t length)
197 {
198 if (!ec)
199 {
200 {
201 std::lock_guard<std::mutex> l(rx_mutex_);
202 rx_.push_back(std::string(io_.rx_buffer.begin(),
203 io_.rx_buffer.begin() + length));
204 }
205 // Acquire poll_mutex briefly to ensure the main thread is not in the
206 // limbo region between _poll_all() releasing the lock and calling
207 // cv_.wait(). Without this, notify_all() could be missed.
208 {
209 std::lock_guard<std::mutex> l(*this->poll_mutex());
210 }
211 this->cv()->notify_all();
212 _start_async_receive();
213 }
214 });
215 }
216
217 // -----------------------------------------------------------------------
218 // Main thread helpers
219 // -----------------------------------------------------------------------
220
221 std::shared_ptr<std::vector<char>> _build_packet(const std::string& identifier,
222 const UDPMPacketHeader& hdr,
223 const char* data_begin, std::size_t data_len)
224 {
225 auto pkt = std::make_shared<std::vector<char>>();
226 pkt->reserve(identifier.size() + UDPM_PACKET_HEADER_SIZE + data_len);
227 pkt->insert(pkt->end(), identifier.begin(), identifier.end());
228 char hdr_buf[UDPM_PACKET_HEADER_SIZE];
229 encode_header(hdr_buf, hdr);
230 pkt->insert(pkt->end(), hdr_buf, hdr_buf + UDPM_PACKET_HEADER_SIZE);
231 if (data_len > 0)
232 pkt->insert(pkt->end(), data_begin, data_begin + data_len);
233 return pkt;
234 }
235
236 void _async_send(std::shared_ptr<std::vector<char>> pkt)
237 {
238 if (cfg_.max_send_rate_bytes_per_second() > 0)
239 {
240 auto now = std::chrono::steady_clock::now();
241
242 std::this_thread::sleep_until(next_send_time_);
243
244 double delay_sec = static_cast<double>(pkt->size()) /
245 static_cast<double>(cfg_.max_send_rate_bytes_per_second());
246
247 if (static_cast<long>(delay_sec * 1.0e9) < std::numeric_limits<long>::max())
248 next_send_time_ =
249 now + std::chrono::nanoseconds(static_cast<long>(1e9 * delay_sec));
250 else if (static_cast<long>(delay_sec * 1.0e6) < std::numeric_limits<long>::max())
251
252 next_send_time_ =
253 now + std::chrono::microseconds(static_cast<long>(1e6 * delay_sec));
254 else
255 next_send_time_ =
256 now + std::chrono::milliseconds(static_cast<long>(1e3 * delay_sec));
257 }
258
259 _do_socket_send(std::move(pkt));
260 }
261
262 void _do_socket_send(std::shared_ptr<std::vector<char>> pkt)
263 {
264 // Post the actual socket write to io_thread_ so that the socket is
265 // only ever touched from io_thread_.
266 boost::asio::post(io_.ctx,
267 [this, pkt = std::move(pkt)]()
268 {
269 io_.socket.async_send_to(
270 boost::asio::buffer(*pkt), io_.transmit_endpoint,
271 [pkt](boost::system::error_code ec, std::size_t length)
272 {
273 if (!ec)
274 goby::glog.is_debug3() &&
275 goby::glog << "UDPM: Sent " << length << "B"
276 << std::endl;
277 else
278 goby::glog.is_warn() &&
279 goby::glog << "UDPM: Send error: " << ec.message()
280 << std::endl;
281 });
282 });
283 }
284
285 void _do_publish(const std::string& identifier, const std::vector<char>& bytes)
286 {
288 goby::glog << "UDPM: Publishing for: "
289 << std::string(identifier.begin(),
290 std::find(identifier.begin(), identifier.end(), '\0'))
291 << ", " << bytes.size() << "B" << std::endl;
292
293 const std::size_t header_overhead = identifier.size() + UDPM_PACKET_HEADER_SIZE;
294 const std::size_t payload_bytes = cfg_.udp_payload_bytes();
295
296 std::string id_key(identifier.begin(),
297 std::find(identifier.begin(), identifier.end(), '\0'));
298
299 uint32_t msg_idx = tx_message_index_[id_key]++;
300
301 // single packet for message
302 if (header_overhead + bytes.size() <= payload_bytes)
303 {
304 goby::glog.is_debug3() && goby::glog << "UDPM: Sent in a single packet" << std::endl;
305
306 UDPMPacketHeader hdr;
307 hdr.message_index = msg_idx;
308 hdr.num_packets = 1;
309 hdr.packet_count = 0;
310 hdr.status = UDPMPacketStatus::NORMAL;
311
312 auto pkt = _build_packet(identifier, hdr, bytes.data(), bytes.size());
313
314 TxMessageEntry entry;
315 entry.message_index = msg_idx;
316 entry.packets.push_back(pkt);
317
318 _async_send(pkt);
319 return;
320 }
321
322 const std::size_t max_data_per_packet = payload_bytes - header_overhead;
323 const std::size_t num_packets_needed =
324 (bytes.size() + max_data_per_packet - 1) / max_data_per_packet;
325
326 if (num_packets_needed > std::numeric_limits<uint16_t>::max())
327 {
329 goby::glog << "UDPM: Message too large to packetize: " << bytes.size() << " bytes, "
330 << num_packets_needed << " packets needed" << std::endl;
331 }
332
333 const uint16_t num_pkts = static_cast<uint16_t>(std::min(
334 num_packets_needed, static_cast<std::size_t>(std::numeric_limits<uint16_t>::max())));
335
336 TxMessageEntry entry;
337 entry.message_index = msg_idx;
338 entry.packets.reserve(num_pkts);
339
340 goby::glog.is_debug3() && goby::glog << "UDPM: Sending in " << num_pkts << " packets"
341 << std::endl;
342 for (uint16_t i = 0; i < num_pkts; ++i)
343 {
344 std::size_t offset = static_cast<std::size_t>(i) * max_data_per_packet;
345 std::size_t chunk = std::min(max_data_per_packet, bytes.size() - offset);
346
347 UDPMPacketHeader hdr;
348 hdr.message_index = msg_idx;
349 hdr.num_packets = num_pkts;
350 hdr.packet_count = i;
351 hdr.status = UDPMPacketStatus::NORMAL;
352
353 auto pkt = _build_packet(identifier, hdr, bytes.data() + offset, chunk);
354 entry.packets.push_back(pkt);
355 _async_send(pkt);
356 }
357 }
358
359 void _do_portal_subscribe(const std::string& identifier)
360 {
361 goby::glog.is_debug3() && goby::glog << "UDPM: Subscribe for: " << identifier << " (no-op)"
362 << std::endl;
363 }
364 void _do_portal_unsubscribe(const std::string& identifier)
365 {
366 goby::glog.is_debug3() && goby::glog << "UDPM: Unsubscribe for: " << identifier
367 << " (no-op)" << std::endl;
368 }
369 void _do_portal_wildcard_subscribe()
370 {
371 goby::glog.is_debug3() && goby::glog << "UDPM: Wildcard subscribe (no-op)" << std::endl;
372 }
373 void _do_portal_wildcard_unsubscribe()
374 {
375 goby::glog.is_debug3() && goby::glog << "UDPM: Wildcard unsubscribe (no-op)" << std::endl;
376 }
377
378 void _process_received_packet(std::unique_ptr<std::unique_lock<std::mutex>>& lock,
379 const std::string& raw)
380 {
381 auto null_it = std::find(raw.begin(), raw.end(),
383 if (null_it == raw.end())
384 {
386 << "UDPM: Received packet with no null terminator, dropping"
387 << std::endl;
388 return;
389 }
390
391 std::string id_key(raw.begin(), null_it);
392
393 auto header_start = null_it + 1;
394 if (raw.end() - header_start < static_cast<std::ptrdiff_t>(UDPM_PACKET_HEADER_SIZE))
395 {
397 goby::glog << "UDPM: Received packet too short for header, dropping" << std::endl;
398 return;
399 }
400
401 UDPMPacketHeader hdr = decode_header(&*header_start);
402 auto data_begin = header_start + UDPM_PACKET_HEADER_SIZE;
403
405 goby::glog << "UDPM: Received packet for " << id_key << " msg_idx=" << hdr.message_index
406 << " num_pkts=" << hdr.num_packets << " pkt_cnt=" << hdr.packet_count
407 << " status=" << static_cast<int>(hdr.status)
408 << " data=" << (raw.end() - data_begin) << "B" << std::endl;
409
410 if (hdr.num_packets == 1)
411 {
412 std::string full_data(raw.begin(), null_it + 1);
413 full_data.append(data_begin, raw.end());
414 this->_handle_received_data(lock, full_data);
415 return;
416 }
417
418 auto& partial = rx_partial_[id_key];
419 if (partial.message_index != hdr.message_index)
420 {
421 goby::glog.is_warn() && goby::glog << "UDPM: Dropping partial packet for " << id_key
422 << " msg_idx=" << hdr.message_index << std::endl;
423 partial = RxPartialMessage();
424 }
425
426 if (partial.num_packets == 0)
427 partial.num_packets = hdr.num_packets;
428
429 partial.received_packets[hdr.packet_count] = std::vector<char>(data_begin, raw.end());
430
431 if (partial.received_packets.size() == hdr.num_packets)
432 {
433 std::string reassembled(raw.begin(), null_it + 1);
434 for (uint16_t i = 0; i < hdr.num_packets; ++i)
435 {
436 auto& pkt = partial.received_packets[i];
437 reassembled.append(pkt.begin(), pkt.end());
438 }
439 partial = RxPartialMessage();
440 this->_handle_received_data(lock, reassembled);
441 }
442 }
443
444 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
445 {
446 int items = 0;
447
448 std::deque<std::string> local_rx;
449 {
450 std::lock_guard<std::mutex> l(rx_mutex_);
451 local_rx.swap(rx_);
452 }
453
454 while (!local_rx.empty())
455 {
456 ++items;
457 _process_received_packet(lock, local_rx.front());
458 local_rx.pop_front();
459 }
460
461 return items;
462 }
463
464 private:
465 struct RxPartialMessage
466 {
467 uint32_t message_index{0};
468 uint16_t num_packets{0};
469 std::unordered_map<uint16_t, std::vector<char>> received_packets;
470 };
471
472 struct TxMessageEntry
473 {
474 uint32_t message_index{0};
475 std::vector<std::shared_ptr<std::vector<char>>> packets;
476 };
477
478 // -----------------------------------------------------------------------
479 // Read-only configuration: written once in constructor, then safe to read
480 // from any thread without synchronization.
481 // -----------------------------------------------------------------------
482 const protobuf::InterProcessPortalConfig cfg_;
483
484 // -----------------------------------------------------------------------
485 // io_thread_ exclusive state
486 //
487 // All members of IOState are accessed ONLY from io_thread_, or from
488 // _init() before io_thread_ is started. Do NOT access these directly
489 // from the main thread once io_thread_ is running.
490 //
491 // Methods that run on io_thread_:
492 // _start_async_receive() -- arms / re-arms async receives
493 // async_receive_from completion handler -- pushes to rx_ then notifies
494 // async_send_to operation -- posted by _do_socket_send() via boost::asio::post
495 // -----------------------------------------------------------------------
496 struct IOState
497 {
499 boost::asio::ip::udp::socket socket{ctx};
500 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work{
501 ctx.get_executor()};
502 boost::asio::ip::udp::endpoint sender_endpoint; // updated by async_receive_from
503 boost::asio::ip::udp::endpoint transmit_endpoint; // set once in _init, then read-only
504
505 static constexpr int max_udp_size{65507};
506 std::array<char, max_udp_size> rx_buffer{}; // staging buffer for incoming datagrams
507 } io_;
508
509 // -----------------------------------------------------------------------
510 // Shared state: written by io_thread_, read (swapped) by main thread.
511 //
512 // rx_ : io_thread_ pushes completed datagrams; main thread swaps it out
513 // in _poll(). All accesses must hold rx_mutex_.
514 // -----------------------------------------------------------------------
515 std::mutex rx_mutex_;
516 std::deque<std::string> rx_;
517
518 // -----------------------------------------------------------------------
519 // Main thread exclusive state
520 //
521 // These members are accessed ONLY from the main thread.
522 //
523 // Methods that run on the main thread:
524 // _init() -- setup; starts io_thread_
525 // _build_packet() -- serializes outgoing datagrams
526 // _async_send() -- rate-limiter; calls _do_socket_send()
527 // _do_socket_send() -- posts async_send_to to io_thread_
528 // _do_publish() -- fragments and enqueues outgoing messages
529 // _do_portal_subscribe/unsubscribe/wildcard_* -- subscription bookkeeping
530 // _process_received_packet() -- reassembles and dispatches incoming data
531 // _poll() -- drains rx_ and calls _process_received_packet
532 // -----------------------------------------------------------------------
533 std::unordered_map<std::string, uint32_t> tx_message_index_;
534 std::unordered_map<std::string, RxPartialMessage> rx_partial_;
535 std::chrono::steady_clock::time_point next_send_time_{std::chrono::steady_clock::now()};
536
537 // -----------------------------------------------------------------------
538 // Thread management
539 // -----------------------------------------------------------------------
540 std::thread io_thread_; // runs io_.ctx.run()
541};
542
543template <typename InnerTransporter = middleware::NullTransporter>
547
548template <typename InnerTransporter = middleware::NullTransporter>
551
552} // namespace udpm
553} // namespace goby
554
555#endif
Implements the forwarder concept for the interprocess layer.
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
void set_lock_action(logger_lock::LockAction lock_action)
UDPMPacketHeader decode_header(const char *buf)
static constexpr std::size_t UDPM_PACKET_HEADER_SIZE
void encode_header(char *buf, const UDPMPacketHeader &h)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
ImplementationTag for udpm interprocess transporters.
Definition tags.h:36