Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
tcp_server_interface.h
Go to the documentation of this file.
1// Copyright 2020-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_IO_DETAIL_TCP_SERVER_INTERFACE_H
25#define GOBY_MIDDLEWARE_IO_DETAIL_TCP_SERVER_INTERFACE_H
26
27#include <memory> // for shared_ptr
28#include <ostream> // for endl, basic_...
29#include <set> // for set
30#include <string> // for operator<<
31#include <utility> // for move
32
33#include <boost/asio/buffer.hpp> // for buffer
34#include <boost/asio/error.hpp> // for eof, make_er...
35#include <boost/asio/ip/tcp.hpp> // for tcp, tcp::en...
36#include <boost/asio/write.hpp> // for async_write
37#include <boost/system/error_code.hpp> // for error_code
38
39#include "goby/exception.h" // for Exception
40#include "goby/middleware/io/detail/io_interface.h" // for PubSubLayer
41#include "goby/middleware/io/groups.h" // for tcp_server_e...
42#include "goby/middleware/protobuf/io.pb.h" // for TCPServerEvent
43#include "goby/middleware/protobuf/tcp_config.pb.h" // for TCPServerConfig
44#include "goby/util/debug_logger/flex_ostream.h" // for glog, FlexOs...
46namespace goby
47{
48namespace middleware
49{
50class Group;
51}
52} // namespace goby
53
54namespace goby
55{
56namespace middleware
57{
58namespace protobuf
59{
60inline bool operator<(const TCPEndPoint& ep_a, const TCPEndPoint& ep_b)
61{
62 return (ep_a.addr() == ep_b.addr()) ? ep_a.port() < ep_b.port() : ep_a.addr() < ep_b.addr();
63}
64
65inline bool operator==(const TCPEndPoint& ep_a, const TCPEndPoint& ep_b)
66{
67 return (ep_a.addr() == ep_b.addr()) && (ep_a.port() == ep_b.port());
68}
69} // namespace protobuf
70namespace io
71{
72namespace detail
73{
74template <typename TCPServerThreadType>
75class TCPSession : public std::enable_shared_from_this<TCPSession<TCPServerThreadType>>
76{
77 public:
78 TCPSession(boost::asio::ip::tcp::socket socket, TCPServerThreadType& server)
79 : socket_(std::move(socket)),
80 server_(server),
81 remote_endpoint_(socket_.remote_endpoint()),
82 local_endpoint_(socket_.local_endpoint())
83 {
84 }
85
86 virtual ~TCPSession()
87 {
88 auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
89 if (server_.index() != -1)
90 event->set_index(server_.index());
92 *event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
93 *event->mutable_remote_endpoint() =
94 endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
95 event->set_number_of_clients(server_.clients_.size());
96 goby::glog.is_debug2() && goby::glog << group(server_.glog_group())
97 << "Event: " << event->ShortDebugString() << std::endl;
98 server_.publish_in(event);
99 }
100
101 void start()
102 {
103 server_.clients_.insert(this->shared_from_this());
104
105 auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
106 if (server_.index() != -1)
107 event->set_index(server_.index());
109 *event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
110 *event->mutable_remote_endpoint() =
111 endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
112 event->set_number_of_clients(server_.clients_.size());
113 goby::glog.is_debug2() && goby::glog << group(server_.glog_group())
114 << "Event: " << event->ShortDebugString() << std::endl;
115 server_.publish_in(event);
116 async_read();
117 }
118
119 const boost::asio::ip::tcp::endpoint& remote_endpoint() { return remote_endpoint_; }
120 const boost::asio::ip::tcp::endpoint& local_endpoint() { return local_endpoint_; }
121
122 const std::string& glog_group() { return server_.glog_group(); }
123
124 // public so TCPServer can call this
125 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
126 {
127 auto self(this->shared_from_this());
128 boost::asio::async_write(
129 socket_, boost::asio::buffer(io_msg->data()),
130 [this, self, io_msg](boost::system::error_code ec, std::size_t bytes_transferred) {
131 if (!ec)
132 {
133 this->handle_write_success(bytes_transferred);
134 }
135 else
136 {
137 this->handle_write_error(ec);
138 }
139 });
140 }
141
142 protected:
143 void handle_write_success(std::size_t bytes_transferred)
144 {
145 server_.handle_write_success(bytes_transferred);
146 }
147 void handle_write_error(const boost::system::error_code& ec)
148 {
149 goby::glog.is_warn() && goby::glog << "Write error: " << ec.message() << std::endl;
150 server_.clients_.erase(this->shared_from_this());
151 }
152
153 void handle_read_success(std::size_t bytes_transferred,
154 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
155 {
156 *io_msg->mutable_tcp_src() = endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
157 *io_msg->mutable_tcp_dest() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
158
159 server_.handle_read_success(bytes_transferred, io_msg);
160 }
161
162 void handle_read_error(const boost::system::error_code& ec)
163 {
164 if (ec != boost::asio::error::eof)
165 goby::glog.is_warn() && goby::glog << "Read error: " << ec.message() << std::endl;
166 // erase ourselves from the client list to ensure destruction
167 server_.clients_.erase(this->shared_from_this());
168 }
169
170 const typename TCPServerThreadType::ConfigType& cfg() { return server_.cfg(); }
171
172 boost::asio::ip::tcp::socket& mutable_socket() { return socket_; }
173
174 private:
175 virtual void async_read() = 0;
176
177 private:
178 boost::asio::ip::tcp::socket socket_;
179 TCPServerThreadType& server_;
180 boost::asio::ip::tcp::endpoint remote_endpoint_;
181 boost::asio::ip::tcp::endpoint local_endpoint_;
182};
183
184template <const goby::middleware::Group& line_in_group,
185 const goby::middleware::Group& line_out_group, PubSubLayer publish_layer,
186 PubSubLayer subscribe_layer, typename Config, template <class> class ThreadType,
187 bool use_indexed_groups = false>
189 : public IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
190 boost::asio::ip::tcp::acceptor, ThreadType, use_indexed_groups>
191{
192 using Base = IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
193 boost::asio::ip::tcp::acceptor, ThreadType, use_indexed_groups>;
194
195 using ConfigType = Config;
196
197 public:
200 TCPServerThread(const Config& config, int index = -1)
201 : Base(config, index, std::string("tcp-l: ") + std::to_string(config.bind_port())),
202 tcp_socket_(this->mutable_io())
203 {
204 auto ready = ThreadState::SUBSCRIPTIONS_COMPLETE;
205 this->interthread().template publish<line_in_group>(ready);
206 }
207
208 ~TCPServerThread() override {}
209
210 template <typename TCPServerThreadType> friend class TCPSession;
211
212 private:
214 void async_read() override { async_accept(); }
215 void async_accept();
216
218 void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) override;
219
221 void open_socket() override { open_acceptor(); }
222 void open_acceptor();
223
224 virtual void start_session(boost::asio::ip::tcp::socket tcp_socket) = 0;
225
226 private:
227 boost::asio::ip::tcp::endpoint remote_endpoint_;
228 boost::asio::ip::tcp::endpoint local_endpoint_;
229
230 boost::asio::ip::tcp::socket tcp_socket_;
231
232 std::set<std::shared_ptr<
233 TCPSession<TCPServerThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
234 Config, ThreadType, use_indexed_groups>>>>
235 clients_;
236};
237} // namespace detail
238} // namespace io
239} // namespace middleware
240} // namespace goby
241
242template <const goby::middleware::Group& line_in_group,
243 const goby::middleware::Group& line_out_group,
245 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
246 template <class> class ThreadType, bool use_indexed_groups>
247void goby::middleware::io::detail::TCPServerThread<line_in_group, line_out_group, publish_layer,
248 subscribe_layer, Config, ThreadType,
249 use_indexed_groups>::open_acceptor()
250{
251 auto& acceptor = this->mutable_socket();
252 auto protocol = this->cfg().ipv6() ? boost::asio::ip::tcp::v6() : boost::asio::ip::tcp::v4();
253 acceptor.open(protocol);
254
255 if (this->cfg().set_reuseaddr())
256 {
257 // SO_REUSEADDR
258 boost::asio::socket_base::reuse_address option(true);
259 acceptor.set_option(option);
260 }
261
262 acceptor.bind(boost::asio::ip::tcp::endpoint(protocol, this->cfg().bind_port()));
263 acceptor.listen();
264
266 goby::glog << group(this->glog_group())
267 << "Successfully bound acceptor to port: " << this->cfg().bind_port()
268 << " and began listening" << std::endl;
269
270 local_endpoint_ = acceptor.local_endpoint();
271
272 auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
273 if (this->index() != -1)
274 event->set_index(this->index());
276 *event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
277 goby::glog.is_debug2() && goby::glog << group(this->glog_group())
278 << "Event: " << event->ShortDebugString() << std::endl;
279 this->publish_in(event);
280}
281
282template <const goby::middleware::Group& line_in_group,
283 const goby::middleware::Group& line_out_group,
285 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
286 template <class> class ThreadType, bool use_indexed_groups>
287void goby::middleware::io::detail::TCPServerThread<line_in_group, line_out_group, publish_layer,
288 subscribe_layer, Config, ThreadType,
289 use_indexed_groups>::async_accept()
290{
291 auto& acceptor = this->mutable_socket();
292 acceptor.async_accept(tcp_socket_, [this](boost::system::error_code ec) {
293 if (!ec)
294 {
295 goby::glog.is_debug2() && goby::glog << group(this->glog_group())
296 << "Received connection from: "
297 << tcp_socket_.remote_endpoint() << std::endl;
298
299 start_session(std::move(tcp_socket_));
300
301 this->async_accept();
302 }
303 else
304 {
305 this->handle_read_error(ec);
306 }
307 });
308}
309
310template <const goby::middleware::Group& line_in_group,
311 const goby::middleware::Group& line_out_group,
313 goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
314 template <class> class ThreadType, bool use_indexed_groups>
316 line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
317 use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
318 io_msg)
319{
320 if (!io_msg->has_tcp_dest())
321 throw(goby::Exception("TCPServerThread requires 'tcp_dest' field to be set in IOData"));
322 else if (!io_msg->tcp_dest().all_clients() &&
323 (!io_msg->tcp_dest().has_addr() || !io_msg->tcp_dest().has_port()))
324 throw(goby::Exception("TCPServerThread requires 'tcp_dest' field to have 'addr'/'port' set "
325 "or all_clients=true in IOData"));
326
327 for (auto& client : clients_)
328 {
329 if (io_msg->tcp_dest().all_clients() ||
330 (io_msg->tcp_dest() ==
331 endpoint_convert<protobuf::TCPEndPoint>(client->remote_endpoint())))
332 {
333 client->async_write(io_msg);
334 }
335 }
336}
337
338#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
TCPServerThread(const Config &config, int index=-1)
Constructs the thread.
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
void handle_write_success(std::size_t bytes_transferred)
const TCPServerThreadType::ConfigType & cfg()
void handle_write_error(const boost::system::error_code &ec)
const boost::asio::ip::tcp::endpoint & remote_endpoint()
TCPSession(boost::asio::ip::tcp::socket socket, TCPServerThreadType &server)
const boost::asio::ip::tcp::endpoint & local_endpoint()
void handle_read_error(const boost::system::error_code &ec)
boost::asio::ip::tcp::socket & mutable_socket()
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
const std::string & addr() const
Definition io.pb.h:2337
static constexpr Event EVENT_BIND
Definition io.pb.h:1426
static constexpr Event EVENT_CONNECT
Definition io.pb.h:1428
static constexpr Event EVENT_DISCONNECT
Definition io.pb.h:1430
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition json.hpp:24301
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
Definition json.hpp:247
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
STL namespace.