Goby3  3.1.4
2024.02.22
tcp_server_interface.h
Go to the documentation of this file.
1 // Copyright 2020-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_IO_DETAIL_TCP_SERVER_INTERFACE_H
25 #define GOBY_MIDDLEWARE_IO_DETAIL_TCP_SERVER_INTERFACE_H
26 
27 #include <memory> // for shared_ptr
28 #include <ostream> // for endl, basic_...
29 #include <set> // for set
30 #include <string> // for operator<<
31 #include <utility> // for move
32 
33 #include <boost/asio/buffer.hpp> // for buffer
34 #include <boost/asio/error.hpp> // for eof, make_er...
35 #include <boost/asio/ip/tcp.hpp> // for tcp, tcp::en...
36 #include <boost/asio/write.hpp> // for async_write
37 #include <boost/system/error_code.hpp> // for error_code
38 
39 #include "goby/exception.h" // for Exception
40 #include "goby/middleware/io/detail/io_interface.h" // for PubSubLayer
41 #include "goby/middleware/io/groups.h" // for tcp_server_e...
42 #include "goby/middleware/protobuf/io.pb.h" // for TCPServerEvent
43 #include "goby/middleware/protobuf/tcp_config.pb.h" // for TCPServerConfig
44 #include "goby/util/debug_logger/flex_ostream.h" // for glog, FlexOs...
46 namespace goby
47 {
48 namespace middleware
49 {
50 class Group;
51 }
52 } // namespace goby
53 
54 namespace goby
55 {
56 namespace middleware
57 {
58 namespace protobuf
59 {
60 inline bool operator<(const TCPEndPoint& ep_a, const TCPEndPoint& ep_b)
61 {
62  return (ep_a.addr() == ep_b.addr()) ? ep_a.port() < ep_b.port() : ep_a.addr() < ep_b.addr();
63 }
64 
65 inline bool operator==(const TCPEndPoint& ep_a, const TCPEndPoint& ep_b)
66 {
67  return (ep_a.addr() == ep_b.addr()) && (ep_a.port() == ep_b.port());
68 }
69 } // namespace protobuf
70 namespace io
71 {
72 namespace detail
73 {
74 template <typename TCPServerThreadType>
75 class TCPSession : public std::enable_shared_from_this<TCPSession<TCPServerThreadType>>
76 {
77  public:
78  TCPSession(boost::asio::ip::tcp::socket socket, TCPServerThreadType& server)
79  : socket_(std::move(socket)),
80  server_(server),
81  remote_endpoint_(socket_.remote_endpoint()),
82  local_endpoint_(socket_.local_endpoint())
83  {
84  }
85 
86  virtual ~TCPSession()
87  {
88  auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
89  if (server_.index() != -1)
90  event->set_index(server_.index());
92  *event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
93  *event->mutable_remote_endpoint() =
94  endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
95  event->set_number_of_clients(server_.clients_.size());
96  goby::glog.is_debug2() && goby::glog << group(server_.glog_group())
97  << "Event: " << event->ShortDebugString() << std::endl;
98  server_.publish_in(event);
99  }
100 
101  void start()
102  {
103  server_.clients_.insert(this->shared_from_this());
104 
105  auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
106  if (server_.index() != -1)
107  event->set_index(server_.index());
109  *event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
110  *event->mutable_remote_endpoint() =
111  endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
112  event->set_number_of_clients(server_.clients_.size());
113  goby::glog.is_debug2() && goby::glog << group(server_.glog_group())
114  << "Event: " << event->ShortDebugString() << std::endl;
115  server_.publish_in(event);
116  async_read();
117  }
118 
119  const boost::asio::ip::tcp::endpoint& remote_endpoint() { return remote_endpoint_; }
120  const boost::asio::ip::tcp::endpoint& local_endpoint() { return local_endpoint_; }
121 
122  const std::string& glog_group() { return server_.glog_group(); }
123 
124  // public so TCPServer can call this
125  virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
126  {
127  auto self(this->shared_from_this());
128  boost::asio::async_write(
129  socket_, boost::asio::buffer(io_msg->data()),
130  [this, self, io_msg](boost::system::error_code ec, std::size_t bytes_transferred) {
131  if (!ec)
132  {
133  this->handle_write_success(bytes_transferred);
134  }
135  else
136  {
137  this->handle_write_error(ec);
138  }
139  });
140  }
141 
142  protected:
143  void handle_write_success(std::size_t bytes_transferred)
144  {
145  server_.handle_write_success(bytes_transferred);
146  }
147  void handle_write_error(const boost::system::error_code& ec)
148  {
149  goby::glog.is_warn() && goby::glog << "Write error: " << ec.message() << std::endl;
150  server_.clients_.erase(this->shared_from_this());
151  }
152 
153  void handle_read_success(std::size_t bytes_transferred,
154  std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
155  {
156  *io_msg->mutable_tcp_src() = endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
157  *io_msg->mutable_tcp_dest() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
158 
159  server_.handle_read_success(bytes_transferred, io_msg);
160  }
161 
162  void handle_read_error(const boost::system::error_code& ec)
163  {
164  if (ec != boost::asio::error::eof)
165  goby::glog.is_warn() && goby::glog << "Read error: " << ec.message() << std::endl;
166  // erase ourselves from the client list to ensure destruction
167  server_.clients_.erase(this->shared_from_this());
168  }
169 
170  const typename TCPServerThreadType::ConfigType& cfg() { return server_.cfg(); }
171 
172  boost::asio::ip::tcp::socket& mutable_socket() { return socket_; }
173 
174  private:
175  virtual void async_read() = 0;
176 
177  private:
178  boost::asio::ip::tcp::socket socket_;
179  TCPServerThreadType& server_;
180  boost::asio::ip::tcp::endpoint remote_endpoint_;
181  boost::asio::ip::tcp::endpoint local_endpoint_;
182 };
183 
184 template <const goby::middleware::Group& line_in_group,
185  const goby::middleware::Group& line_out_group, PubSubLayer publish_layer,
186  PubSubLayer subscribe_layer, typename Config, template <class> class ThreadType,
187  bool use_indexed_groups = false>
189  : public IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
190  boost::asio::ip::tcp::acceptor, ThreadType, use_indexed_groups>
191 {
192  using Base = IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
193  boost::asio::ip::tcp::acceptor, ThreadType, use_indexed_groups>;
194 
195  using ConfigType = Config;
196 
197  public:
200  TCPServerThread(const Config& config, int index = -1)
201  : Base(config, index, std::string("tcp-l: ") + std::to_string(config.bind_port())),
202  tcp_socket_(this->mutable_io())
203  {
205  this->interthread().template publish<line_in_group>(ready);
206  }
207 
208  ~TCPServerThread() override {}
209 
210  template <typename TCPServerThreadType> friend class TCPSession;
211 
212  private:
214  void async_read() override { async_accept(); }
215  void async_accept();
216 
218  void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) override;
219 
221  void open_socket() override { open_acceptor(); }
222  void open_acceptor();
223 
224  virtual void start_session(boost::asio::ip::tcp::socket tcp_socket) = 0;
225 
226  private:
227  boost::asio::ip::tcp::endpoint remote_endpoint_;
228  boost::asio::ip::tcp::endpoint local_endpoint_;
229 
230  boost::asio::ip::tcp::socket tcp_socket_;
231 
232  std::set<std::shared_ptr<
233  TCPSession<TCPServerThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
234  Config, ThreadType, use_indexed_groups>>>>
235  clients_;
236 };
237 } // namespace detail
238 } // namespace io
239 } // namespace middleware
240 } // namespace goby
241 
242 template <const goby::middleware::Group& line_in_group,
243  const goby::middleware::Group& line_out_group,
244  goby::middleware::io::PubSubLayer publish_layer,
245  goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
246  template <class> class ThreadType, bool use_indexed_groups>
247 void goby::middleware::io::detail::TCPServerThread<line_in_group, line_out_group, publish_layer,
248  subscribe_layer, Config, ThreadType,
249  use_indexed_groups>::open_acceptor()
250 {
251  auto& acceptor = this->mutable_socket();
252  auto protocol = this->cfg().ipv6() ? boost::asio::ip::tcp::v6() : boost::asio::ip::tcp::v4();
253  acceptor.open(protocol);
254 
255  if (this->cfg().set_reuseaddr())
256  {
257  // SO_REUSEADDR
258  boost::asio::socket_base::reuse_address option(true);
259  acceptor.set_option(option);
260  }
261 
262  acceptor.bind(boost::asio::ip::tcp::endpoint(protocol, this->cfg().bind_port()));
263  acceptor.listen();
264 
265  goby::glog.is_debug2() &&
266  goby::glog << group(this->glog_group())
267  << "Successfully bound acceptor to port: " << this->cfg().bind_port()
268  << " and began listening" << std::endl;
269 
270  local_endpoint_ = acceptor.local_endpoint();
271 
272  auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
273  if (this->index() != -1)
274  event->set_index(this->index());
276  *event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
277  goby::glog.is_debug2() && goby::glog << group(this->glog_group())
278  << "Event: " << event->ShortDebugString() << std::endl;
279  this->publish_in(event);
280 }
281 
282 template <const goby::middleware::Group& line_in_group,
283  const goby::middleware::Group& line_out_group,
284  goby::middleware::io::PubSubLayer publish_layer,
285  goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
286  template <class> class ThreadType, bool use_indexed_groups>
287 void goby::middleware::io::detail::TCPServerThread<line_in_group, line_out_group, publish_layer,
288  subscribe_layer, Config, ThreadType,
289  use_indexed_groups>::async_accept()
290 {
291  auto& acceptor = this->mutable_socket();
292  acceptor.async_accept(tcp_socket_, [this](boost::system::error_code ec) {
293  if (!ec)
294  {
295  goby::glog.is_debug2() && goby::glog << group(this->glog_group())
296  << "Received connection from: "
297  << tcp_socket_.remote_endpoint() << std::endl;
298 
299  start_session(std::move(tcp_socket_));
300 
301  this->async_accept();
302  }
303  else
304  {
305  this->handle_read_error(ec);
306  }
307  });
308 }
309 
310 template <const goby::middleware::Group& line_in_group,
311  const goby::middleware::Group& line_out_group,
312  goby::middleware::io::PubSubLayer publish_layer,
313  goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
314  template <class> class ThreadType, bool use_indexed_groups>
316  line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
317  use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
318  io_msg)
319 {
320  if (!io_msg->has_tcp_dest())
321  throw(goby::Exception("TCPServerThread requires 'tcp_dest' field to be set in IOData"));
322  else if (!io_msg->tcp_dest().all_clients() &&
323  (!io_msg->tcp_dest().has_addr() || !io_msg->tcp_dest().has_port()))
324  throw(goby::Exception("TCPServerThread requires 'tcp_dest' field to have 'addr'/'port' set "
325  "or all_clients=true in IOData"));
326 
327  for (auto& client : clients_)
328  {
329  if (io_msg->tcp_dest().all_clients() ||
330  (io_msg->tcp_dest() ==
331  endpoint_convert<protobuf::TCPEndPoint>(client->remote_endpoint())))
332  {
333  client->async_write(io_msg);
334  }
335  }
336 }
337 
338 #endif
goby::middleware::protobuf::TCPServerEvent::EVENT_DISCONNECT
static const Event EVENT_DISCONNECT
Definition: io.pb.h:1092
goby::middleware::io::detail::TCPSession::async_write
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
Definition: tcp_server_interface.h:125
goby::middleware::protobuf::TCPEndPoint::has_port
bool has_port() const
Definition: io.pb.h:1838
goby::middleware::io::detail::TCPSession::local_endpoint
const boost::asio::ip::tcp::endpoint & local_endpoint()
Definition: tcp_server_interface.h:120
io.pb.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::util::FlexOstream::is_warn
bool is_warn()
Definition: flex_ostream.h:82
goby::middleware::io::detail::TCPSession::TCPSession
TCPSession(boost::asio::ip::tcp::socket socket, TCPServerThreadType &server)
Definition: tcp_server_interface.h:78
goby::middleware::io::detail::TCPServerThread::TCPServerThread
TCPServerThread(const Config &config, int index=-1)
Constructs the thread.
Definition: tcp_server_interface.h:200
goby::middleware::protobuf::IOData::tcp_dest
const ::goby::middleware::protobuf::TCPEndPoint & tcp_dest() const
Definition: io.pb.h:2072
goby::middleware::Thread::cfg
const Config & cfg() const
Definition: thread.h:201
goby::middleware::io::detail::TCPSession::start
void start()
Definition: tcp_server_interface.h:101
goby::acomms::abc::protobuf::config
extern ::google::protobuf::internal::ExtensionIdentifier< ::goby::acomms::protobuf::DriverConfig, ::google::protobuf::internal::MessageTypeTraits< ::goby::acomms::abc::protobuf::Config >, 11, false > config
Definition: abc_driver.pb.h:203
goby::middleware::protobuf::TCPServerEvent::EVENT_CONNECT
static const Event EVENT_CONNECT
Definition: io.pb.h:1090
goby::middleware::io::detail::TCPSession::handle_write_success
void handle_write_success(std::size_t bytes_transferred)
Definition: tcp_server_interface.h:143
detail
detail namespace with internal helper functions
Definition: json.hpp:246
goby::middleware::io::detail::TCPSession::~TCPSession
virtual ~TCPSession()
Definition: tcp_server_interface.h:86
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
boost
Definition: udp_driver.h:41
goby::middleware::io::detail::TCPServerThread::~TCPServerThread
~TCPServerThread() override
Definition: tcp_server_interface.h:208
goby::util::FlexOstream::is_debug2
bool is_debug2()
Definition: flex_ostream.h:85
goby::middleware::io::detail::TCPSession::glog_group
const std::string & glog_group()
Definition: tcp_server_interface.h:122
tcp_config.pb.h
goby::middleware::io::detail::TCPSession
Definition: tcp_server_interface.h:75
goby::middleware::io::detail::TCPSession::cfg
const TCPServerThreadType::ConfigType & cfg()
Definition: tcp_server_interface.h:170
goby::middleware::protobuf::operator<
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
Definition: tcp_server_interface.h:60
goby::middleware::io::detail::TCPSession::handle_write_error
void handle_write_error(const boost::system::error_code &ec)
Definition: tcp_server_interface.h:147
goby::middleware::io::PubSubLayer
PubSubLayer
Definition: io_transporters.h:38
goby::middleware::protobuf::TCPEndPoint
Definition: io.pb.h:345
goby::middleware::protobuf::IOData::data
const ::std::string & data() const
Definition: io.pb.h:2103
goby::middleware::protobuf::operator==
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
Definition: tcp_server_interface.h:65
goby::middleware::protobuf::TCPEndPoint::addr
const ::std::string & addr() const
Definition: io.pb.h:1785
goby::middleware::io::detail::TCPSession::handle_read_error
void handle_read_error(const boost::system::error_code &ec)
Definition: tcp_server_interface.h:162
goby::middleware::protobuf::TCPEndPoint::has_addr
bool has_addr() const
Definition: io.pb.h:1772
flex_ostream.h
groups.h
goby::middleware::protobuf::IOData::has_tcp_dest
bool has_tcp_dest() const
Definition: io.pb.h:2046
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::middleware::io::detail::TCPSession::mutable_socket
boost::asio::ip::tcp::socket & mutable_socket()
Definition: tcp_server_interface.h:172
logger_manipulators.h
goby::middleware::io::detail::TCPServerThread
Definition: tcp_server_interface.h:188
goby::middleware::io::detail::TCPSession::handle_read_success
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
Definition: tcp_server_interface.h:153
io_interface.h
goby::middleware::io::detail::IOThread
Definition: io_interface.h:79
goby::middleware::protobuf::TCPEndPoint::port
::google::protobuf::uint32 port() const
Definition: io.pb.h:1851
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
goby::middleware::protobuf::TCPEndPoint::all_clients
bool all_clients() const
Definition: io.pb.h:1875
exception.h
goby::middleware::protobuf::IOData::mutable_tcp_dest
::goby::middleware::protobuf::TCPEndPoint * mutable_tcp_dest()
Definition: io.pb.h:2078
goby::middleware::io::ThreadState::SUBSCRIPTIONS_COMPLETE
@ SUBSCRIPTIONS_COMPLETE
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::middleware::protobuf::TCPServerEvent::EVENT_BIND
static const Event EVENT_BIND
Definition: io.pb.h:1088
goby::middleware::to_string
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition: common.h:44
goby::middleware::io::detail::TCPSession::remote_endpoint
const boost::asio::ip::tcp::endpoint & remote_endpoint()
Definition: tcp_server_interface.h:119
goby::middleware::protobuf::IOData::mutable_tcp_src
::goby::middleware::protobuf::TCPEndPoint * mutable_tcp_src()
Definition: io.pb.h:1990