Goby3 3.5.0
2026.05.29
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Copilot <198982749+Copilot@users.noreply.github.com>
6// Toby Schneider <toby@gobysoft.org>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_UDPM_TRANSPORT_INTERPROCESS_H
26#define GOBY_UDPM_TRANSPORT_INTERPROCESS_H
27
28#include <arpa/inet.h>
29#include <array>
30#include <boost/asio/buffer.hpp>
31#include <boost/asio/executor_work_guard.hpp>
32#include <boost/asio/ip/multicast.hpp>
33#include <boost/asio/ip/udp.hpp>
34#include <boost/asio/post.hpp>
35#include <boost/asio/socket_base.hpp>
36#include <boost/asio/steady_timer.hpp>
37#include <boost/circular_buffer.hpp>
38#include <chrono>
39#include <cstring>
40#include <deque>
41#include <memory>
42#include <mutex>
43#include <set>
44#include <thread>
45#include <unordered_map>
46
51
54
55namespace goby
56{
57namespace middleware
58{
59template <typename Data> class Publisher;
60} // namespace middleware
61
62namespace udpm
63{
64
65// Packet status codes
66enum class UDPMPacketStatus : uint8_t
67{
68 NORMAL = 0,
69};
70
71// Packet header (9 bytes, after the null-terminated identifier):
72// message_index : uint32 (network byte order)
73// num_packets : uint16 (network byte order)
74// packet_count : uint16 (network byte order)
75// status : uint8
76static constexpr std::size_t UDPM_PACKET_HEADER_SIZE =
77 sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint8_t);
78
86
87inline void encode_header(char* buf, const UDPMPacketHeader& h)
88{
89 uint32_t mi = htonl(h.message_index);
90 uint16_t np = htons(h.num_packets);
91 uint16_t pc = htons(h.packet_count);
92 std::memcpy(buf, &mi, 4);
93 std::memcpy(buf + 4, &np, 2);
94 std::memcpy(buf + 6, &pc, 2);
95 buf[8] = static_cast<uint8_t>(h.status);
96}
97
98inline UDPMPacketHeader decode_header(const char* buf)
99{
101 uint32_t mi;
102 uint16_t np, pc;
103 std::memcpy(&mi, buf, 4);
104 std::memcpy(&np, buf + 4, 2);
105 std::memcpy(&pc, buf + 6, 2);
106 h.message_index = ntohl(mi);
107 h.num_packets = ntohs(np);
108 h.packet_count = ntohs(pc);
109 h.status = static_cast<UDPMPacketStatus>(static_cast<uint8_t>(buf[8]));
110 return h;
111}
112
113template <typename InnerTransporter,
114 template <typename Derived, typename InnerTransporterType,
115 typename ImplementationTag_> class PortalBase,
116 typename ImplementationTag>
118 : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
119 ImplementationTag>,
120 InnerTransporter, ImplementationTag>
121{
122 public:
124 ImplementationTag>,
125 InnerTransporter, ImplementationTag>;
126
128
130 {
131 _init();
132 }
133
134 InterProcessPortalImplementation(InnerTransporter& inner,
136 : Base(inner), cfg_(cfg)
137 {
138 _init();
139 }
140
142 {
143 // Close socket first to cancel any pending async operations,
144 // then release the work guard so io_.ctx.run() can return, then join.
145 io_.socket.close();
146 io_.work.reset();
147 if (io_thread_.joinable())
148 io_thread_.join();
149 }
150
151 // no-op - no hold implemented in UDPM
152 void ready() {}
153 bool hold_state() { return false; }
154
155 friend Base;
156 friend typename Base::Base;
157 friend typename Base::Common;
158
159 private:
160 // -----------------------------------------------------------------------
161 // Called from main thread before io_thread_ starts; sets up socket and
162 // launches io_thread_.
163 // -----------------------------------------------------------------------
164 void _init()
165 {
167
168 boost::asio::ip::address listen_address =
169 boost::asio::ip::make_address(cfg_.listen_address());
170 boost::asio::ip::address multicast_address =
171 boost::asio::ip::make_address(cfg_.multicast_address());
172 short multicast_port = static_cast<short>(cfg_.multicast_port());
173
174 boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
175 io_.transmit_endpoint = boost::asio::ip::udp::endpoint(multicast_address, multicast_port);
176
177 io_.socket.open(listen_endpoint.protocol());
178 io_.socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
179 io_.socket.bind(listen_endpoint);
180 io_.socket.set_option(boost::asio::ip::multicast::join_group(multicast_address));
181
182 // Arm the first async receive before starting io_thread_ so the
183 // operation is already queued when io_.ctx.run() is called.
184 _start_async_receive();
185 io_thread_ = std::thread([this]() { io_.ctx.run(); });
186 }
187
188 // -----------------------------------------------------------------------
189 // Runs on io_thread_: arms (or re-arms) a single async datagram receive.
190 // Called once from _init() (main thread, before io_thread_ starts) and
191 // then re-posted from within the completion handler on io_thread_.
192 // -----------------------------------------------------------------------
193 void _start_async_receive()
194 {
195 io_.socket.async_receive_from(
196 boost::asio::buffer(io_.rx_buffer), io_.sender_endpoint,
197 [this](boost::system::error_code ec, std::size_t length)
198 {
199 if (!ec)
200 {
201 {
202 std::lock_guard<std::mutex> l(rx_mutex_);
203 rx_.push_back(std::string(io_.rx_buffer.begin(),
204 io_.rx_buffer.begin() + length));
205 }
206 // Acquire poll_mutex briefly to ensure the main thread is not in the
207 // limbo region between _poll_all() releasing the lock and calling
208 // cv_.wait(). Without this, notify_all() could be missed.
209 {
210 std::lock_guard<std::mutex> l(*this->poll_mutex());
211 }
212 this->cv()->notify_all();
213 _start_async_receive();
214 }
215 });
216 }
217
218 // -----------------------------------------------------------------------
219 // Main thread helpers
220 // -----------------------------------------------------------------------
221
222 std::shared_ptr<std::vector<char>> _build_packet(const std::string& identifier,
223 const UDPMPacketHeader& hdr,
224 const char* data_begin, std::size_t data_len)
225 {
226 auto pkt = std::make_shared<std::vector<char>>();
227 pkt->reserve(identifier.size() + UDPM_PACKET_HEADER_SIZE + data_len);
228 pkt->insert(pkt->end(), identifier.begin(), identifier.end());
229 char hdr_buf[UDPM_PACKET_HEADER_SIZE];
230 encode_header(hdr_buf, hdr);
231 pkt->insert(pkt->end(), hdr_buf, hdr_buf + UDPM_PACKET_HEADER_SIZE);
232 if (data_len > 0)
233 pkt->insert(pkt->end(), data_begin, data_begin + data_len);
234 return pkt;
235 }
236
237 void _async_send(std::shared_ptr<std::vector<char>> pkt)
238 {
239 if (cfg_.max_send_rate_bytes_per_second() > 0)
240 {
241 auto now = std::chrono::steady_clock::now();
242
243 std::this_thread::sleep_until(next_send_time_);
244
245 double delay_sec = static_cast<double>(pkt->size()) /
246 static_cast<double>(cfg_.max_send_rate_bytes_per_second());
247
248 if (static_cast<long>(delay_sec * 1.0e9) < std::numeric_limits<long>::max())
249 next_send_time_ =
250 now + std::chrono::nanoseconds(static_cast<long>(1e9 * delay_sec));
251 else if (static_cast<long>(delay_sec * 1.0e6) < std::numeric_limits<long>::max())
252
253 next_send_time_ =
254 now + std::chrono::microseconds(static_cast<long>(1e6 * delay_sec));
255 else
256 next_send_time_ =
257 now + std::chrono::milliseconds(static_cast<long>(1e3 * delay_sec));
258 }
259
260 _do_socket_send(std::move(pkt));
261 }
262
263 void _do_socket_send(std::shared_ptr<std::vector<char>> pkt)
264 {
265 // Post the actual socket write to io_thread_ so that the socket is
266 // only ever touched from io_thread_.
267 boost::asio::post(io_.ctx,
268 [this, pkt = std::move(pkt)]()
269 {
270 io_.socket.async_send_to(
271 boost::asio::buffer(*pkt), io_.transmit_endpoint,
272 [pkt](boost::system::error_code ec, std::size_t length)
273 {
274 if (!ec)
275 goby::glog.is_debug3() &&
276 goby::glog << "UDPM: Sent " << length << "B"
277 << std::endl;
278 else
279 goby::glog.is_warn() &&
280 goby::glog << "UDPM: Send error: " << ec.message()
281 << std::endl;
282 });
283 });
284 }
285
286 void _do_publish(const std::string& identifier, const std::vector<char>& bytes)
287 {
289 goby::glog << "UDPM: Publishing for: "
290 << std::string(identifier.begin(),
291 std::find(identifier.begin(), identifier.end(), '\0'))
292 << ", " << bytes.size() << "B" << std::endl;
293
294 const std::size_t header_overhead = identifier.size() + UDPM_PACKET_HEADER_SIZE;
295 const std::size_t payload_bytes = cfg_.udp_payload_bytes();
296
297 std::string id_key(identifier.begin(),
298 std::find(identifier.begin(), identifier.end(), '\0'));
299
300 uint32_t msg_idx = tx_message_index_[id_key]++;
301
302 // single packet for message
303 if (header_overhead + bytes.size() <= payload_bytes)
304 {
305 goby::glog.is_debug3() && goby::glog << "UDPM: Sent in a single packet" << std::endl;
306
307 UDPMPacketHeader hdr;
308 hdr.message_index = msg_idx;
309 hdr.num_packets = 1;
310 hdr.packet_count = 0;
311 hdr.status = UDPMPacketStatus::NORMAL;
312
313 auto pkt = _build_packet(identifier, hdr, bytes.data(), bytes.size());
314
315 TxMessageEntry entry;
316 entry.message_index = msg_idx;
317 entry.packets.push_back(pkt);
318
319 _async_send(pkt);
320 return;
321 }
322
323 const std::size_t max_data_per_packet = payload_bytes - header_overhead;
324 const std::size_t num_packets_needed =
325 (bytes.size() + max_data_per_packet - 1) / max_data_per_packet;
326
327 if (num_packets_needed > std::numeric_limits<uint16_t>::max())
328 {
330 goby::glog << "UDPM: Message too large to packetize: " << bytes.size() << " bytes, "
331 << num_packets_needed << " packets needed" << std::endl;
332 }
333
334 const uint16_t num_pkts = static_cast<uint16_t>(std::min(
335 num_packets_needed, static_cast<std::size_t>(std::numeric_limits<uint16_t>::max())));
336
337 TxMessageEntry entry;
338 entry.message_index = msg_idx;
339 entry.packets.reserve(num_pkts);
340
341 goby::glog.is_debug3() && goby::glog << "UDPM: Sending in " << num_pkts << " packets"
342 << std::endl;
343 for (uint16_t i = 0; i < num_pkts; ++i)
344 {
345 std::size_t offset = static_cast<std::size_t>(i) * max_data_per_packet;
346 std::size_t chunk = std::min(max_data_per_packet, bytes.size() - offset);
347
348 UDPMPacketHeader hdr;
349 hdr.message_index = msg_idx;
350 hdr.num_packets = num_pkts;
351 hdr.packet_count = i;
352 hdr.status = UDPMPacketStatus::NORMAL;
353
354 auto pkt = _build_packet(identifier, hdr, bytes.data() + offset, chunk);
355 entry.packets.push_back(pkt);
356 _async_send(pkt);
357 }
358 }
359
360 void _do_portal_subscribe(const std::string& identifier)
361 {
362 goby::glog.is_debug3() && goby::glog << "UDPM: Subscribe for: " << identifier << " (no-op)"
363 << std::endl;
364 }
365 void _do_portal_unsubscribe(const std::string& identifier)
366 {
367 goby::glog.is_debug3() && goby::glog << "UDPM: Unsubscribe for: " << identifier
368 << " (no-op)" << std::endl;
369 }
370 void _do_portal_wildcard_subscribe()
371 {
372 goby::glog.is_debug3() && goby::glog << "UDPM: Wildcard subscribe (no-op)" << std::endl;
373 }
374 void _do_portal_wildcard_unsubscribe()
375 {
376 goby::glog.is_debug3() && goby::glog << "UDPM: Wildcard unsubscribe (no-op)" << std::endl;
377 }
378
379 void _process_received_packet(std::unique_ptr<std::unique_lock<std::mutex>>& lock,
380 const std::string& raw)
381 {
382 auto null_it = std::find(raw.begin(), raw.end(),
384 if (null_it == raw.end())
385 {
387 << "UDPM: Received packet with no null terminator, dropping"
388 << std::endl;
389 return;
390 }
391
392 std::string id_key(raw.begin(), null_it);
393
394 auto header_start = null_it + 1;
395 if (raw.end() - header_start < static_cast<std::ptrdiff_t>(UDPM_PACKET_HEADER_SIZE))
396 {
398 goby::glog << "UDPM: Received packet too short for header, dropping" << std::endl;
399 return;
400 }
401
402 UDPMPacketHeader hdr = decode_header(&*header_start);
403 auto data_begin = header_start + UDPM_PACKET_HEADER_SIZE;
404
406 goby::glog << "UDPM: Received packet for " << id_key << " msg_idx=" << hdr.message_index
407 << " num_pkts=" << hdr.num_packets << " pkt_cnt=" << hdr.packet_count
408 << " status=" << static_cast<int>(hdr.status)
409 << " data=" << (raw.end() - data_begin) << "B" << std::endl;
410
411 if (hdr.num_packets == 1)
412 {
413 std::string full_data(raw.begin(), null_it + 1);
414 full_data.append(data_begin, raw.end());
415 this->_handle_received_data(lock, full_data);
416 return;
417 }
418
419 auto& partial = rx_partial_[id_key];
420 if (partial.message_index != hdr.message_index)
421 {
422 goby::glog.is_warn() && goby::glog << "UDPM: Dropping partial packet for " << id_key
423 << " msg_idx=" << hdr.message_index << std::endl;
424 partial = RxPartialMessage();
425 }
426
427 if (partial.num_packets == 0)
428 partial.num_packets = hdr.num_packets;
429
430 partial.received_packets[hdr.packet_count] = std::vector<char>(data_begin, raw.end());
431
432 if (partial.received_packets.size() == hdr.num_packets)
433 {
434 std::string reassembled(raw.begin(), null_it + 1);
435 for (uint16_t i = 0; i < hdr.num_packets; ++i)
436 {
437 auto& pkt = partial.received_packets[i];
438 reassembled.append(pkt.begin(), pkt.end());
439 }
440 partial = RxPartialMessage();
441 this->_handle_received_data(lock, reassembled);
442 }
443 }
444
445 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
446 {
447 int items = 0;
448
449 std::deque<std::string> local_rx;
450 {
451 std::lock_guard<std::mutex> l(rx_mutex_);
452 local_rx.swap(rx_);
453 }
454
455 while (!local_rx.empty())
456 {
457 ++items;
458 _process_received_packet(lock, local_rx.front());
459 local_rx.pop_front();
460 }
461
462 return items;
463 }
464
465 private:
466 struct RxPartialMessage
467 {
468 uint32_t message_index{0};
469 uint16_t num_packets{0};
470 std::unordered_map<uint16_t, std::vector<char>> received_packets;
471 };
472
473 struct TxMessageEntry
474 {
475 uint32_t message_index{0};
476 std::vector<std::shared_ptr<std::vector<char>>> packets;
477 };
478
479 // -----------------------------------------------------------------------
480 // Read-only configuration: written once in constructor, then safe to read
481 // from any thread without synchronization.
482 // -----------------------------------------------------------------------
483 const protobuf::InterProcessPortalConfig cfg_;
484
485 // -----------------------------------------------------------------------
486 // io_thread_ exclusive state
487 //
488 // All members of IOState are accessed ONLY from io_thread_, or from
489 // _init() before io_thread_ is started. Do NOT access these directly
490 // from the main thread once io_thread_ is running.
491 //
492 // Methods that run on io_thread_:
493 // _start_async_receive() -- arms / re-arms async receives
494 // async_receive_from completion handler -- pushes to rx_ then notifies
495 // async_send_to operation -- posted by _do_socket_send() via boost::asio::post
496 // -----------------------------------------------------------------------
497 struct IOState
498 {
500 boost::asio::ip::udp::socket socket{ctx};
501 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work{
502 ctx.get_executor()};
503 boost::asio::ip::udp::endpoint sender_endpoint; // updated by async_receive_from
504 boost::asio::ip::udp::endpoint transmit_endpoint; // set once in _init, then read-only
505
506 static constexpr int max_udp_size{65507};
507 std::array<char, max_udp_size> rx_buffer{}; // staging buffer for incoming datagrams
508 } io_;
509
510 // -----------------------------------------------------------------------
511 // Shared state: written by io_thread_, read (swapped) by main thread.
512 //
513 // rx_ : io_thread_ pushes completed datagrams; main thread swaps it out
514 // in _poll(). All accesses must hold rx_mutex_.
515 // -----------------------------------------------------------------------
516 std::mutex rx_mutex_;
517 std::deque<std::string> rx_;
518
519 // -----------------------------------------------------------------------
520 // Main thread exclusive state
521 //
522 // These members are accessed ONLY from the main thread.
523 //
524 // Methods that run on the main thread:
525 // _init() -- setup; starts io_thread_
526 // _build_packet() -- serializes outgoing datagrams
527 // _async_send() -- rate-limiter; calls _do_socket_send()
528 // _do_socket_send() -- posts async_send_to to io_thread_
529 // _do_publish() -- fragments and enqueues outgoing messages
530 // _do_portal_subscribe/unsubscribe/wildcard_* -- subscription bookkeeping
531 // _process_received_packet() -- reassembles and dispatches incoming data
532 // _poll() -- drains rx_ and calls _process_received_packet
533 // -----------------------------------------------------------------------
534 std::unordered_map<std::string, uint32_t> tx_message_index_;
535 std::unordered_map<std::string, RxPartialMessage> rx_partial_;
536 std::chrono::steady_clock::time_point next_send_time_{std::chrono::steady_clock::now()};
537
538 // -----------------------------------------------------------------------
539 // Thread management
540 // -----------------------------------------------------------------------
541 std::thread io_thread_; // runs io_.ctx.run()
542};
543
544template <typename InnerTransporter = middleware::NullTransporter>
548
549template <typename InnerTransporter = middleware::NullTransporter>
552
553} // namespace udpm
554} // namespace goby
555
556#endif
Implements the forwarder concept for the interprocess layer.
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
void set_lock_action(logger_lock::LockAction lock_action)
UDPMPacketHeader decode_header(const char *buf)
static constexpr std::size_t UDPM_PACKET_HEADER_SIZE
void encode_header(char *buf, const UDPMPacketHeader &h)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
ImplementationTag for udpm interprocess transporters.
Definition tags.h:36