Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
udp_one_to_many.h
Go to the documentation of this file.
1// Copyright 2019-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
26#define GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
27
28#include <array> // for array
29#include <boost/asio/buffer.hpp> // for buffer
30#include <boost/asio/ip/udp.hpp> // for udp, udp::endpoint
31#include <boost/asio/socket_base.hpp> // for socket_base
32#include <boost/system/error_code.hpp> // for error_code
33#include <cstddef> // for size_t
34#include <memory> // for shared_ptr, __s...
35#include <string> // for string, to_string
36
37#include "goby/exception.h" // for Exception
38#include "goby/middleware/io/detail/io_interface.h" // for PubSubLayer
39#include "goby/middleware/protobuf/io.pb.h" // for IOData, UDPEndP...
40#include "goby/middleware/protobuf/udp_config.pb.h" // for UDPOneToManyConfig
42
43namespace goby
44{
45namespace middleware
46{
47class Group;
48}
49} // namespace goby
50
51namespace goby
52{
53namespace middleware
54{
55namespace io
56{
57template <const goby::middleware::Group& line_in_group,
58 const goby::middleware::Group& line_out_group,
59 // by default publish all incoming traffic to interprocess for logging
61 // but only subscribe on interthread for outgoing traffic
62 PubSubLayer subscribe_layer = PubSubLayer::INTERTHREAD,
64 template <class> class ThreadType = goby::zeromq::SimpleThread,
65 bool use_indexed_groups = false>
67 : public detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
68 boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>
69{
70 using Base =
71 detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
72 boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>;
73
74 public:
77 UDPOneToManyThread(const Config& config, int index = -1, bool is_final = true)
78 : Base(config, index, std::string("udp: ") + std::to_string(config.bind_port()))
79 {
80 if (is_final)
81 {
83 this->interthread().template publish<line_in_group>(ready);
84 }
85 }
86
87 ~UDPOneToManyThread() override {}
88
89 protected:
91 virtual void async_read() override;
92
94 virtual void
95 async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) override;
96
97 private:
99 void open_socket() override;
100
101 private:
102 static constexpr int max_udp_size{65507};
103 std::array<char, max_udp_size> rx_message_;
104 boost::asio::ip::udp::endpoint sender_endpoint_;
105 boost::asio::ip::udp::endpoint local_endpoint_;
106};
107} // namespace io
108} // namespace middleware
109} // namespace goby
110
111template <const goby::middleware::Group& line_in_group,
112 const goby::middleware::Group& line_out_group,
114 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
115 template <class> class ThreadType, bool use_indexed_groups>
116void goby::middleware::io::UDPOneToManyThread<line_in_group, line_out_group, publish_layer,
117 subscribe_layer, Config, ThreadType,
118 use_indexed_groups>::open_socket()
119{
120 auto protocol = this->cfg().ipv6() ? boost::asio::ip::udp::v6() : boost::asio::ip::udp::v4();
121 this->mutable_socket().open(protocol);
122
123 if (this->cfg().set_reuseaddr())
124 {
125 // SO_REUSEADDR
126 boost::asio::socket_base::reuse_address option(true);
127 this->mutable_socket().set_option(option);
128 }
129
130 if (this->cfg().set_broadcast())
131 {
132 // SO_BROADCAST
133 this->mutable_socket().set_option(boost::asio::socket_base::broadcast(true));
134 }
135
136 this->mutable_socket().bind(boost::asio::ip::udp::endpoint(protocol, this->cfg().bind_port()));
137 local_endpoint_ = this->mutable_socket().local_endpoint();
138}
139
140template <const goby::middleware::Group& line_in_group,
141 const goby::middleware::Group& line_out_group,
143 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
144 template <class> class ThreadType, bool use_indexed_groups>
145void goby::middleware::io::UDPOneToManyThread<line_in_group, line_out_group, publish_layer,
146 subscribe_layer, Config, ThreadType,
147 use_indexed_groups>::async_read()
148{
149 this->mutable_socket().async_receive_from(
150 boost::asio::buffer(rx_message_), sender_endpoint_,
151 [this](const boost::system::error_code& ec, size_t bytes_transferred)
152 {
153 if (!ec && bytes_transferred > 0)
154 {
155 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
156 *io_msg->mutable_data() =
157 std::string(rx_message_.begin(), rx_message_.begin() + bytes_transferred);
158
159 *io_msg->mutable_udp_src() =
160 detail::endpoint_convert<protobuf::UDPEndPoint>(sender_endpoint_);
161 *io_msg->mutable_udp_dest() =
162 detail::endpoint_convert<protobuf::UDPEndPoint>(local_endpoint_);
163
164 this->handle_read_success(bytes_transferred, io_msg);
165 this->async_read();
166 }
167 else
168 {
169 this->handle_read_error(ec);
170 }
171 });
172}
173
174template <const goby::middleware::Group& line_in_group,
175 const goby::middleware::Group& line_out_group,
177 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
178 template <class> class ThreadType, bool use_indexed_groups>
180 line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
181 use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
182 io_msg)
183{
184 if (!io_msg->has_udp_dest())
185 throw(goby::Exception("UDPOneToManyThread requires 'udp_dest' field to be set in IOData"));
186
187 boost::asio::ip::udp::resolver resolver(this->mutable_io());
188 boost::asio::ip::udp::endpoint remote_endpoint =
189 resolver
190 .resolve(io_msg->udp_dest().addr(), std::to_string(io_msg->udp_dest().port()),
191 boost::asio::ip::resolver_base::numeric_service)
192 .begin()
193 ->endpoint();
194
195 this->mutable_socket().async_send_to(
196 boost::asio::buffer(io_msg->data()), remote_endpoint,
197 [this, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred)
198 {
199 if (!ec && bytes_transferred > 0)
200 {
201 this->handle_write_success(bytes_transferred);
202 }
203 else
204 {
205 this->handle_write_error(ec);
206 }
207 });
208}
209
210#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
virtual void async_read() override
Starts an asynchronous read on the udp socket.
UDPOneToManyThread(const Config &config, int index=-1, bool is_final=true)
Constructs the thread.
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg) override
Starts an asynchronous write from data published.
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition common.h:44
middleware::SimpleThread< Config, detail::InterProcessTag > SimpleThread
Zeromq-backed SimpleThread. Derives from middleware::SimpleThread using InterProcessTag.
The global namespace for the Goby project.
STL namespace.