95 async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
override;
99 void open_socket()
override;
102 static constexpr int max_udp_size{65507};
103 std::array<char, max_udp_size> rx_message_;
104 boost::asio::ip::udp::endpoint sender_endpoint_;
105 boost::asio::ip::udp::endpoint local_endpoint_;
115 template <
class>
class ThreadType,
bool use_indexed_groups>
117 subscribe_layer, Config, ThreadType,
118 use_indexed_groups>::open_socket()
120 auto protocol = this->cfg().ipv6() ? boost::asio::ip::udp::v6() :
boost::asio::ip::udp::v4();
121 this->mutable_socket().open(protocol);
123 if (this->cfg().set_reuseaddr())
126 boost::asio::socket_base::reuse_address option(
true);
127 this->mutable_socket().set_option(option);
130 if (this->cfg().set_broadcast())
133 this->mutable_socket().set_option(boost::asio::socket_base::broadcast(
true));
136 this->mutable_socket().bind(boost::asio::ip::udp::endpoint(protocol, this->cfg().bind_port()));
137 local_endpoint_ = this->mutable_socket().local_endpoint();
144 template <
class>
class ThreadType,
bool use_indexed_groups>
146 subscribe_layer, Config, ThreadType,
147 use_indexed_groups>::async_read()
149 this->mutable_socket().async_receive_from(
150 boost::asio::buffer(rx_message_), sender_endpoint_,
151 [
this](
const boost::system::error_code& ec,
size_t bytes_transferred)
153 if (!ec && bytes_transferred > 0)
155 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
156 *io_msg->mutable_data() =
157 std::string(rx_message_.begin(), rx_message_.begin() + bytes_transferred);
159 *io_msg->mutable_udp_src() =
160 detail::endpoint_convert<protobuf::UDPEndPoint>(sender_endpoint_);
161 *io_msg->mutable_udp_dest() =
162 detail::endpoint_convert<protobuf::UDPEndPoint>(local_endpoint_);
164 this->handle_read_success(bytes_transferred, io_msg);
169 this->handle_read_error(ec);
178 template <
class>
class ThreadType,
bool use_indexed_groups>
180 line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
181 use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
184 if (!io_msg->has_udp_dest())
185 throw(
goby::Exception(
"UDPOneToManyThread requires 'udp_dest' field to be set in IOData"));
187 boost::asio::ip::udp::resolver resolver(this->mutable_io());
188 boost::asio::ip::udp::endpoint remote_endpoint =
190 .resolve(io_msg->udp_dest().addr(), std::to_string(io_msg->udp_dest().port()),
191 boost::asio::ip::resolver_base::numeric_service)
195 this->mutable_socket().async_send_to(
196 boost::asio::buffer(io_msg->data()), remote_endpoint,
197 [
this, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred)
199 if (!ec && bytes_transferred > 0)
201 this->handle_write_success(bytes_transferred);
205 this->handle_write_error(ec);