118 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
120 InnerTransporter, ImplementationTag>
125 InnerTransporter, ImplementationTag>;
136 :
Base(inner), cfg_(cfg)
147 if (io_thread_.joinable())
156 friend typename Base::Base;
157 friend typename Base::Common;
168 boost::asio::ip::address listen_address =
169 boost::asio::ip::make_address(cfg_.listen_address());
170 boost::asio::ip::address multicast_address =
171 boost::asio::ip::make_address(cfg_.multicast_address());
172 short multicast_port =
static_cast<short>(cfg_.multicast_port());
174 boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
175 io_.transmit_endpoint = boost::asio::ip::udp::endpoint(multicast_address, multicast_port);
177 io_.socket.open(listen_endpoint.protocol());
178 io_.socket.set_option(boost::asio::ip::udp::socket::reuse_address(
true));
179 io_.socket.bind(listen_endpoint);
180 io_.socket.set_option(boost::asio::ip::multicast::join_group(multicast_address));
184 _start_async_receive();
185 io_thread_ = std::thread([
this]() { io_.ctx.run(); });
193 void _start_async_receive()
195 io_.socket.async_receive_from(
196 boost::asio::buffer(io_.rx_buffer), io_.sender_endpoint,
197 [
this](boost::system::error_code ec, std::size_t length)
202 std::lock_guard<std::mutex> l(rx_mutex_);
203 rx_.push_back(std::string(io_.rx_buffer.begin(),
204 io_.rx_buffer.begin() + length));
210 std::lock_guard<std::mutex> l(*this->poll_mutex());
212 this->cv()->notify_all();
213 _start_async_receive();
222 std::shared_ptr<std::vector<char>> _build_packet(
const std::string& identifier,
223 const UDPMPacketHeader& hdr,
224 const char* data_begin, std::size_t data_len)
226 auto pkt = std::make_shared<std::vector<char>>();
228 pkt->insert(pkt->end(), identifier.begin(), identifier.end());
233 pkt->insert(pkt->end(), data_begin, data_begin + data_len);
237 void _async_send(std::shared_ptr<std::vector<char>> pkt)
239 if (cfg_.max_send_rate_bytes_per_second() > 0)
241 auto now = std::chrono::steady_clock::now();
243 std::this_thread::sleep_until(next_send_time_);
245 double delay_sec =
static_cast<double>(pkt->size()) /
246 static_cast<double>(cfg_.max_send_rate_bytes_per_second());
248 if (
static_cast<long>(delay_sec * 1.0e9) < std::numeric_limits<long>::max())
250 now + std::chrono::nanoseconds(
static_cast<long>(1e9 * delay_sec));
251 else if (
static_cast<long>(delay_sec * 1.0e6) < std::numeric_limits<long>::max())
254 now + std::chrono::microseconds(
static_cast<long>(1e6 * delay_sec));
257 now + std::chrono::milliseconds(
static_cast<long>(1e3 * delay_sec));
260 _do_socket_send(std::move(pkt));
263 void _do_socket_send(std::shared_ptr<std::vector<char>> pkt)
267 boost::asio::post(io_.ctx,
268 [
this, pkt = std::move(pkt)]()
270 io_.socket.async_send_to(
271 boost::asio::buffer(*pkt), io_.transmit_endpoint,
272 [pkt](boost::system::error_code ec, std::size_t length)
275 goby::glog.is_debug3() &&
276 goby::glog <<
"UDPM: Sent " << length <<
"B"
279 goby::glog.is_warn() &&
280 goby::glog <<
"UDPM: Send error: " << ec.message()
286 void _do_publish(
const std::string& identifier,
const std::vector<char>& bytes)
290 << std::string(identifier.begin(),
291 std::find(identifier.begin(), identifier.end(),
'\0'))
292 <<
", " << bytes.size() <<
"B" << std::endl;
295 const std::size_t payload_bytes = cfg_.udp_payload_bytes();
297 std::string id_key(identifier.begin(),
298 std::find(identifier.begin(), identifier.end(),
'\0'));
300 uint32_t msg_idx = tx_message_index_[id_key]++;
303 if (header_overhead + bytes.size() <= payload_bytes)
307 UDPMPacketHeader hdr;
308 hdr.message_index = msg_idx;
310 hdr.packet_count = 0;
311 hdr.status = UDPMPacketStatus::NORMAL;
313 auto pkt = _build_packet(identifier, hdr, bytes.data(), bytes.size());
315 TxMessageEntry entry;
316 entry.message_index = msg_idx;
317 entry.packets.push_back(pkt);
323 const std::size_t max_data_per_packet = payload_bytes - header_overhead;
324 const std::size_t num_packets_needed =
325 (bytes.size() + max_data_per_packet - 1) / max_data_per_packet;
327 if (num_packets_needed > std::numeric_limits<uint16_t>::max())
330 goby::glog <<
"UDPM: Message too large to packetize: " << bytes.size() <<
" bytes, "
331 << num_packets_needed <<
" packets needed" << std::endl;
334 const uint16_t num_pkts =
static_cast<uint16_t
>(std::min(
335 num_packets_needed,
static_cast<std::size_t
>(std::numeric_limits<uint16_t>::max())));
337 TxMessageEntry entry;
338 entry.message_index = msg_idx;
339 entry.packets.reserve(num_pkts);
343 for (uint16_t i = 0; i < num_pkts; ++i)
345 std::size_t offset =
static_cast<std::size_t
>(i) * max_data_per_packet;
346 std::size_t chunk = std::min(max_data_per_packet, bytes.size() - offset);
348 UDPMPacketHeader hdr;
349 hdr.message_index = msg_idx;
350 hdr.num_packets = num_pkts;
351 hdr.packet_count = i;
352 hdr.status = UDPMPacketStatus::NORMAL;
354 auto pkt = _build_packet(identifier, hdr, bytes.data() + offset, chunk);
355 entry.packets.push_back(pkt);
360 void _do_portal_subscribe(
const std::string& identifier)
365 void _do_portal_unsubscribe(
const std::string& identifier)
368 <<
" (no-op)" << std::endl;
370 void _do_portal_wildcard_subscribe()
374 void _do_portal_wildcard_unsubscribe()
379 void _process_received_packet(std::unique_ptr<std::unique_lock<std::mutex>>& lock,
380 const std::string& raw)
382 auto null_it = std::find(raw.begin(), raw.end(),
384 if (null_it == raw.end())
387 <<
"UDPM: Received packet with no null terminator, dropping"
392 std::string id_key(raw.begin(), null_it);
394 auto header_start = null_it + 1;
395 if (raw.end() - header_start <
static_cast<std::ptrdiff_t
>(UDPM_PACKET_HEADER_SIZE))
398 goby::glog <<
"UDPM: Received packet too short for header, dropping" << std::endl;
406 goby::glog <<
"UDPM: Received packet for " << id_key <<
" msg_idx=" << hdr.message_index
407 <<
" num_pkts=" << hdr.num_packets <<
" pkt_cnt=" << hdr.packet_count
408 <<
" status=" <<
static_cast<int>(hdr.status)
409 <<
" data=" << (raw.end() - data_begin) <<
"B" << std::endl;
411 if (hdr.num_packets == 1)
413 std::string full_data(raw.begin(), null_it + 1);
414 full_data.append(data_begin, raw.end());
415 this->_handle_received_data(lock, full_data);
419 auto& partial = rx_partial_[id_key];
420 if (partial.message_index != hdr.message_index)
423 <<
" msg_idx=" << hdr.message_index << std::endl;
424 partial = RxPartialMessage();
427 if (partial.num_packets == 0)
428 partial.num_packets = hdr.num_packets;
430 partial.received_packets[hdr.packet_count] = std::vector<char>(data_begin, raw.end());
432 if (partial.received_packets.size() == hdr.num_packets)
434 std::string reassembled(raw.begin(), null_it + 1);
435 for (uint16_t i = 0; i < hdr.num_packets; ++i)
437 auto& pkt = partial.received_packets[i];
438 reassembled.append(pkt.begin(), pkt.end());
440 partial = RxPartialMessage();
441 this->_handle_received_data(lock, reassembled);
445 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
449 std::deque<std::string> local_rx;
451 std::lock_guard<std::mutex> l(rx_mutex_);
455 while (!local_rx.empty())
458 _process_received_packet(lock, local_rx.front());
459 local_rx.pop_front();
466 struct RxPartialMessage
468 uint32_t message_index{0};
469 uint16_t num_packets{0};
470 std::unordered_map<uint16_t, std::vector<char>> received_packets;
473 struct TxMessageEntry
475 uint32_t message_index{0};
476 std::vector<std::shared_ptr<std::vector<char>>> packets;
483 const protobuf::InterProcessPortalConfig cfg_;
500 boost::asio::ip::udp::socket socket{ctx};
501 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work{
503 boost::asio::ip::udp::endpoint sender_endpoint;
504 boost::asio::ip::udp::endpoint transmit_endpoint;
506 static constexpr int max_udp_size{65507};
507 std::array<char, max_udp_size> rx_buffer{};
516 std::mutex rx_mutex_;
517 std::deque<std::string> rx_;
534 std::unordered_map<std::string, uint32_t> tx_message_index_;
535 std::unordered_map<std::string, RxPartialMessage> rx_partial_;
536 std::chrono::steady_clock::time_point next_send_time_{std::chrono::steady_clock::now()};
541 std::thread io_thread_;