117 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
119 InnerTransporter, ImplementationTag>
124 InnerTransporter, ImplementationTag>;
135 :
Base(inner), cfg_(cfg)
146 if (io_thread_.joinable())
155 friend typename Base::Base;
156 friend typename Base::Common;
167 boost::asio::ip::address listen_address =
168 boost::asio::ip::make_address(cfg_.listen_address());
169 boost::asio::ip::address multicast_address =
170 boost::asio::ip::make_address(cfg_.multicast_address());
171 short multicast_port =
static_cast<short>(cfg_.multicast_port());
173 boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
174 io_.transmit_endpoint = boost::asio::ip::udp::endpoint(multicast_address, multicast_port);
176 io_.socket.open(listen_endpoint.protocol());
177 io_.socket.set_option(boost::asio::ip::udp::socket::reuse_address(
true));
178 io_.socket.bind(listen_endpoint);
179 io_.socket.set_option(boost::asio::ip::multicast::join_group(multicast_address));
183 _start_async_receive();
184 io_thread_ = std::thread([
this]() { io_.ctx.run(); });
192 void _start_async_receive()
194 io_.socket.async_receive_from(
195 boost::asio::buffer(io_.rx_buffer), io_.sender_endpoint,
196 [
this](boost::system::error_code ec, std::size_t length)
201 std::lock_guard<std::mutex> l(rx_mutex_);
202 rx_.push_back(std::string(io_.rx_buffer.begin(),
203 io_.rx_buffer.begin() + length));
209 std::lock_guard<std::mutex> l(*this->poll_mutex());
211 this->cv()->notify_all();
212 _start_async_receive();
221 std::shared_ptr<std::vector<char>> _build_packet(
const std::string& identifier,
222 const UDPMPacketHeader& hdr,
223 const char* data_begin, std::size_t data_len)
225 auto pkt = std::make_shared<std::vector<char>>();
227 pkt->insert(pkt->end(), identifier.begin(), identifier.end());
232 pkt->insert(pkt->end(), data_begin, data_begin + data_len);
236 void _async_send(std::shared_ptr<std::vector<char>> pkt)
238 if (cfg_.max_send_rate_bytes_per_second() > 0)
240 auto now = std::chrono::steady_clock::now();
242 std::this_thread::sleep_until(next_send_time_);
244 double delay_sec =
static_cast<double>(pkt->size()) /
245 static_cast<double>(cfg_.max_send_rate_bytes_per_second());
247 if (
static_cast<long>(delay_sec * 1.0e9) < std::numeric_limits<long>::max())
249 now + std::chrono::nanoseconds(
static_cast<long>(1e9 * delay_sec));
250 else if (
static_cast<long>(delay_sec * 1.0e6) < std::numeric_limits<long>::max())
253 now + std::chrono::microseconds(
static_cast<long>(1e6 * delay_sec));
256 now + std::chrono::milliseconds(
static_cast<long>(1e3 * delay_sec));
259 _do_socket_send(std::move(pkt));
262 void _do_socket_send(std::shared_ptr<std::vector<char>> pkt)
266 boost::asio::post(io_.ctx,
267 [
this, pkt = std::move(pkt)]()
269 io_.socket.async_send_to(
270 boost::asio::buffer(*pkt), io_.transmit_endpoint,
271 [pkt](boost::system::error_code ec, std::size_t length)
274 goby::glog.is_debug3() &&
275 goby::glog <<
"UDPM: Sent " << length <<
"B"
278 goby::glog.is_warn() &&
279 goby::glog <<
"UDPM: Send error: " << ec.message()
285 void _do_publish(
const std::string& identifier,
const std::vector<char>& bytes)
289 << std::string(identifier.begin(),
290 std::find(identifier.begin(), identifier.end(),
'\0'))
291 <<
", " << bytes.size() <<
"B" << std::endl;
294 const std::size_t payload_bytes = cfg_.udp_payload_bytes();
296 std::string id_key(identifier.begin(),
297 std::find(identifier.begin(), identifier.end(),
'\0'));
299 uint32_t msg_idx = tx_message_index_[id_key]++;
302 if (header_overhead + bytes.size() <= payload_bytes)
306 UDPMPacketHeader hdr;
307 hdr.message_index = msg_idx;
309 hdr.packet_count = 0;
310 hdr.status = UDPMPacketStatus::NORMAL;
312 auto pkt = _build_packet(identifier, hdr, bytes.data(), bytes.size());
314 TxMessageEntry entry;
315 entry.message_index = msg_idx;
316 entry.packets.push_back(pkt);
322 const std::size_t max_data_per_packet = payload_bytes - header_overhead;
323 const std::size_t num_packets_needed =
324 (bytes.size() + max_data_per_packet - 1) / max_data_per_packet;
326 if (num_packets_needed > std::numeric_limits<uint16_t>::max())
329 goby::glog <<
"UDPM: Message too large to packetize: " << bytes.size() <<
" bytes, "
330 << num_packets_needed <<
" packets needed" << std::endl;
333 const uint16_t num_pkts =
static_cast<uint16_t
>(std::min(
334 num_packets_needed,
static_cast<std::size_t
>(std::numeric_limits<uint16_t>::max())));
336 TxMessageEntry entry;
337 entry.message_index = msg_idx;
338 entry.packets.reserve(num_pkts);
342 for (uint16_t i = 0; i < num_pkts; ++i)
344 std::size_t offset =
static_cast<std::size_t
>(i) * max_data_per_packet;
345 std::size_t chunk = std::min(max_data_per_packet, bytes.size() - offset);
347 UDPMPacketHeader hdr;
348 hdr.message_index = msg_idx;
349 hdr.num_packets = num_pkts;
350 hdr.packet_count = i;
351 hdr.status = UDPMPacketStatus::NORMAL;
353 auto pkt = _build_packet(identifier, hdr, bytes.data() + offset, chunk);
354 entry.packets.push_back(pkt);
359 void _do_portal_subscribe(
const std::string& identifier)
364 void _do_portal_unsubscribe(
const std::string& identifier)
367 <<
" (no-op)" << std::endl;
369 void _do_portal_wildcard_subscribe()
373 void _do_portal_wildcard_unsubscribe()
378 void _process_received_packet(std::unique_ptr<std::unique_lock<std::mutex>>& lock,
379 const std::string& raw)
381 auto null_it = std::find(raw.begin(), raw.end(),
383 if (null_it == raw.end())
386 <<
"UDPM: Received packet with no null terminator, dropping"
391 std::string id_key(raw.begin(), null_it);
393 auto header_start = null_it + 1;
394 if (raw.end() - header_start <
static_cast<std::ptrdiff_t
>(UDPM_PACKET_HEADER_SIZE))
397 goby::glog <<
"UDPM: Received packet too short for header, dropping" << std::endl;
405 goby::glog <<
"UDPM: Received packet for " << id_key <<
" msg_idx=" << hdr.message_index
406 <<
" num_pkts=" << hdr.num_packets <<
" pkt_cnt=" << hdr.packet_count
407 <<
" status=" <<
static_cast<int>(hdr.status)
408 <<
" data=" << (raw.end() - data_begin) <<
"B" << std::endl;
410 if (hdr.num_packets == 1)
412 std::string full_data(raw.begin(), null_it + 1);
413 full_data.append(data_begin, raw.end());
414 this->_handle_received_data(lock, full_data);
418 auto& partial = rx_partial_[id_key];
419 if (partial.message_index != hdr.message_index)
422 <<
" msg_idx=" << hdr.message_index << std::endl;
423 partial = RxPartialMessage();
426 if (partial.num_packets == 0)
427 partial.num_packets = hdr.num_packets;
429 partial.received_packets[hdr.packet_count] = std::vector<char>(data_begin, raw.end());
431 if (partial.received_packets.size() == hdr.num_packets)
433 std::string reassembled(raw.begin(), null_it + 1);
434 for (uint16_t i = 0; i < hdr.num_packets; ++i)
436 auto& pkt = partial.received_packets[i];
437 reassembled.append(pkt.begin(), pkt.end());
439 partial = RxPartialMessage();
440 this->_handle_received_data(lock, reassembled);
444 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
448 std::deque<std::string> local_rx;
450 std::lock_guard<std::mutex> l(rx_mutex_);
454 while (!local_rx.empty())
457 _process_received_packet(lock, local_rx.front());
458 local_rx.pop_front();
465 struct RxPartialMessage
467 uint32_t message_index{0};
468 uint16_t num_packets{0};
469 std::unordered_map<uint16_t, std::vector<char>> received_packets;
472 struct TxMessageEntry
474 uint32_t message_index{0};
475 std::vector<std::shared_ptr<std::vector<char>>> packets;
482 const protobuf::InterProcessPortalConfig cfg_;
499 boost::asio::ip::udp::socket socket{ctx};
500 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work{
502 boost::asio::ip::udp::endpoint sender_endpoint;
503 boost::asio::ip::udp::endpoint transmit_endpoint;
505 static constexpr int max_udp_size{65507};
506 std::array<char, max_udp_size> rx_buffer{};
515 std::mutex rx_mutex_;
516 std::deque<std::string> rx_;
533 std::unordered_map<std::string, uint32_t> tx_message_index_;
534 std::unordered_map<std::string, RxPartialMessage> rx_partial_;
535 std::chrono::steady_clock::time_point next_send_time_{std::chrono::steady_clock::now()};
540 std::thread io_thread_;