Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
udp_one_to_many.h
Go to the documentation of this file.
1// Copyright 2019-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
25#define GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
26
27#include <array> // for array
28#include <boost/asio/buffer.hpp> // for buffer
29#include <boost/asio/ip/udp.hpp>
30#include <boost/asio/ip/udp.hpp> // for udp, udp::endpoint
31#include <boost/asio/socket_base.hpp> // for socket_base
32#include <boost/system/error_code.hpp> // for error_code
33#include <cstddef> // for size_t
34#include <memory> // for shared_ptr, __s...
35#include <string> // for string, to_string
36
37#include "goby/exception.h" // for Exception
38#include "goby/middleware/io/detail/io_interface.h" // for PubSubLayer
39#include "goby/middleware/protobuf/io.pb.h" // for IOData, UDPEndP...
40#include "goby/middleware/protobuf/udp_config.pb.h" // for UDPOneToManyConfig
41
42namespace goby
43{
44namespace middleware
45{
46class Group;
47}
48} // namespace goby
49
50namespace goby
51{
52namespace middleware
53{
54namespace io
55{
56template <const goby::middleware::Group& line_in_group,
57 const goby::middleware::Group& line_out_group,
58 // by default publish all incoming traffic to interprocess for logging
60 // but only subscribe on interthread for outgoing traffic
61 PubSubLayer subscribe_layer = PubSubLayer::INTERTHREAD,
63 template <class> class ThreadType = goby::middleware::SimpleThread,
64 bool use_indexed_groups = false>
66 : public detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
67 boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>
68{
69 using Base =
70 detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
71 boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>;
72
73 public:
76 UDPOneToManyThread(const Config& config, int index = -1, bool is_final = true)
77 : Base(config, index, std::string("udp: ") + std::to_string(config.bind_port()))
78 {
79 if (is_final)
80 {
82 this->interthread().template publish<line_in_group>(ready);
83 }
84 }
85
86 ~UDPOneToManyThread() override {}
87
88 protected:
90 virtual void async_read() override;
91
93 virtual void
94 async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) override;
95
96 private:
98 void open_socket() override;
99
100 private:
101 static constexpr int max_udp_size{65507};
102 std::array<char, max_udp_size> rx_message_;
103 boost::asio::ip::udp::endpoint sender_endpoint_;
104 boost::asio::ip::udp::endpoint local_endpoint_;
105};
106} // namespace io
107} // namespace middleware
108} // namespace goby
109
110template <const goby::middleware::Group& line_in_group,
111 const goby::middleware::Group& line_out_group,
113 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
114 template <class> class ThreadType, bool use_indexed_groups>
115void goby::middleware::io::UDPOneToManyThread<line_in_group, line_out_group, publish_layer,
116 subscribe_layer, Config, ThreadType,
117 use_indexed_groups>::open_socket()
118{
119 auto protocol = this->cfg().ipv6() ? boost::asio::ip::udp::v6() : boost::asio::ip::udp::v4();
120 this->mutable_socket().open(protocol);
121
122 if (this->cfg().set_reuseaddr())
123 {
124 // SO_REUSEADDR
125 boost::asio::socket_base::reuse_address option(true);
126 this->mutable_socket().set_option(option);
127 }
128
129 if (this->cfg().set_broadcast())
130 {
131 // SO_BROADCAST
132 this->mutable_socket().set_option(boost::asio::socket_base::broadcast(true));
133 }
134
135 this->mutable_socket().bind(boost::asio::ip::udp::endpoint(protocol, this->cfg().bind_port()));
136 local_endpoint_ = this->mutable_socket().local_endpoint();
137}
138
139template <const goby::middleware::Group& line_in_group,
140 const goby::middleware::Group& line_out_group,
142 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
143 template <class> class ThreadType, bool use_indexed_groups>
144void goby::middleware::io::UDPOneToManyThread<line_in_group, line_out_group, publish_layer,
145 subscribe_layer, Config, ThreadType,
146 use_indexed_groups>::async_read()
147{
148 this->mutable_socket().async_receive_from(
149 boost::asio::buffer(rx_message_), sender_endpoint_,
150 [this](const boost::system::error_code& ec, size_t bytes_transferred)
151 {
152 if (!ec && bytes_transferred > 0)
153 {
154 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
155 *io_msg->mutable_data() =
156 std::string(rx_message_.begin(), rx_message_.begin() + bytes_transferred);
157
158 *io_msg->mutable_udp_src() =
159 detail::endpoint_convert<protobuf::UDPEndPoint>(sender_endpoint_);
160 *io_msg->mutable_udp_dest() =
161 detail::endpoint_convert<protobuf::UDPEndPoint>(local_endpoint_);
162
163 this->handle_read_success(bytes_transferred, io_msg);
164 this->async_read();
165 }
166 else
167 {
168 this->handle_read_error(ec);
169 }
170 });
171}
172
173template <const goby::middleware::Group& line_in_group,
174 const goby::middleware::Group& line_out_group,
176 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
177 template <class> class ThreadType, bool use_indexed_groups>
179 line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
180 use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
181 io_msg)
182{
183 if (!io_msg->has_udp_dest())
184 throw(goby::Exception("UDPOneToManyThread requires 'udp_dest' field to be set in IOData"));
185
186 boost::asio::ip::udp::resolver resolver(this->mutable_io());
187 boost::asio::ip::udp::endpoint remote_endpoint =
188 *resolver.resolve({io_msg->udp_dest().addr(), std::to_string(io_msg->udp_dest().port()),
189 boost::asio::ip::resolver_query_base::numeric_service});
190
191 this->mutable_socket().async_send_to(
192 boost::asio::buffer(io_msg->data()), remote_endpoint,
193 [this, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred)
194 {
195 if (!ec && bytes_transferred > 0)
196 {
197 this->handle_write_success(bytes_transferred);
198 }
199 else
200 {
201 this->handle_write_error(ec);
202 }
203 });
204}
205
206#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Implements Thread for a three layer middleware setup ([ intervehicle [ interprocess [ interthread ] ]...
virtual void async_read() override
Starts an asynchronous read on the udp socket.
UDPOneToManyThread(const Config &config, int index=-1, bool is_final=true)
Constructs the thread.
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg) override
Starts an asynchronous write from data published.
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition common.h:44
The global namespace for the Goby project.
STL namespace.