NETSIM  1.0.0
Hardware-in-the-loopacousticnetworksimulator
tcp_session.h
Go to the documentation of this file.
1 // Copyright 2020:
2 // GobySoft, LLC (2017-)
3 // Massachusetts Institute of Technology (2017-)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the NETSIM Libraries.
9 //
10 // The NETSIM Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The NETSIM Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with NETSIM. If not, see <http://www.gnu.org/licenses/>.
22 
23 #ifndef TCP_SESSION_20180131H
24 #define TCP_SESSION_20180131H
25 
26 #include <cstdint>
27 #include <cstdlib>
28 #include <memory>
29 #include <set>
30 #include <unordered_map>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/asio.hpp>
35 
36 #include <google/protobuf/message.h>
37 
38 #include <dccl/binary.h>
39 
40 #include <goby/util/asio_compat.h>
41 
42 namespace netsim
43 {
45 {
46  public:
47  ReceiveBase() = default;
48  virtual ~ReceiveBase() = default;
49 
50  virtual void post(const std::string& pb_name, const std::string& bytes,
51  const boost::asio::ip::tcp::endpoint& remote) = 0;
52 };
53 
54 template <typename ProtobufMessage> class Receive : public ReceiveBase
55 {
56  public:
57  using CallbackType = std::function<void(const ProtobufMessage& msg,
58  const boost::asio::ip::tcp::endpoint& remote)>;
59 
60  Receive(CallbackType f) : f_(f) {}
61  void post(const std::string& pb_name, const std::string& bytes,
62  const boost::asio::ip::tcp::endpoint& remote)
63  {
64  ProtobufMessage msg;
65  msg.ParseFromString(bytes);
66  f_(msg, remote);
67  }
68 
69  private:
70  CallbackType f_;
71 };
72 
74 {
75  public:
76  using CallbackType = std::function<void(const std::string& pb_name, const std::string& bytes,
77  const boost::asio::ip::tcp::endpoint& remote)>;
79  void post(const std::string& pb_name, const std::string& bytes,
80  const boost::asio::ip::tcp::endpoint& remote)
81  {
82  f_(pb_name, bytes, remote);
83  }
84 
85  private:
86  CallbackType f_;
87 };
88 
89 class tcp_session : public std::enable_shared_from_this<tcp_session>
90 {
91  public:
92  tcp_session(std::unique_ptr<boost::asio::ip::tcp::socket> socket);
93  void start();
94 
95  // receive all messages (unparsed)
97  {
98  std::unique_ptr<ReceiveBase> rx(new ReceiveUnparsed(f));
99  rx_all_.insert(std::move(rx));
100  }
101 
102  // receive specific types (parsed)
103  template <typename ProtobufMessage>
105  {
106  std::unique_ptr<ReceiveBase> rx(new Receive<ProtobufMessage>(f));
107  rx_callbacks_.insert(
108  std::make_pair(ProtobufMessage::descriptor()->full_name(), std::move(rx)));
109  }
110 
111  void write(const google::protobuf::Message& message);
112  void disconnect();
113  bool connected() { return bool(self_); };
114 
115  boost::asio::ip::tcp::endpoint remote_endpoint()
116  {
117  if (socket_)
118  return socket_->remote_endpoint();
119  else
120  return boost::asio::ip::tcp::endpoint();
121  }
122 
123  protected:
124  std::unique_ptr<boost::asio::ip::tcp::socket>& socket() { return socket_; }
125 
126  // NETSIM|[ASCII protobuf name, e.g. goby.moos.protobuf.NodeStatus]|data (b64)\n
127  private:
128  void read();
129  void name_read();
130  void data_read(std::uint32_t size);
131 
132  private:
133  std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
134  // used to indicate that we are still valid (connected)
135  // if we reset this, the server should clean up this connection
136  std::shared_ptr<netsim::tcp_session> self_;
137 
138  // protobuf full name -> Receive
139  std::unordered_multimap<std::string, std::unique_ptr<ReceiveBase>> rx_callbacks_;
140  std::set<std::unique_ptr<ReceiveBase>> rx_all_;
141 
142  boost::asio::streambuf buffer_;
143 
144  const char end_of_line_{'\n'};
145  const char delimiter_{'|'};
146  const std::string preamble_{"NETSIM"};
147 };
148 } // namespace netsim
149 
150 #endif
netsim::tcp_session::start
void start()
netsim::tcp_session::remote_endpoint
boost::asio::ip::tcp::endpoint remote_endpoint()
Definition: tcp_session.h:115
netsim::ReceiveBase::post
virtual void post(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)=0
netsim::ReceiveUnparsed::CallbackType
std::function< void(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)> CallbackType
Definition: tcp_session.h:77
netsim::Receive
Definition: tcp_session.h:54
netsim::tcp_session::tcp_session
tcp_session(std::unique_ptr< boost::asio::ip::tcp::socket > socket)
netsim::tcp_session::read_callback
void read_callback(typename ReceiveUnparsed::CallbackType f)
Definition: tcp_session.h:96
netsim
Definition: environment.h:30
netsim::ReceiveBase::~ReceiveBase
virtual ~ReceiveBase()=default
netsim::ReceiveUnparsed::ReceiveUnparsed
ReceiveUnparsed(CallbackType f)
Definition: tcp_session.h:78
netsim::ReceiveBase
Definition: tcp_session.h:44
netsim::Receive::post
void post(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)
Definition: tcp_session.h:61
netsim::tcp_session
Definition: tcp_session.h:89
netsim::ReceiveUnparsed
Definition: tcp_session.h:73
netsim::tcp_session::read_callback
void read_callback(typename Receive< ProtobufMessage >::CallbackType f)
Definition: tcp_session.h:104
netsim::ReceiveBase::ReceiveBase
ReceiveBase()=default
netsim::Receive::CallbackType
std::function< void(const ProtobufMessage &msg, const boost::asio::ip::tcp::endpoint &remote)> CallbackType
Definition: tcp_session.h:58
netsim::tcp_session::disconnect
void disconnect()
netsim::tcp_session::write
void write(const google::protobuf::Message &message)
netsim::Receive::Receive
Receive(CallbackType f)
Definition: tcp_session.h:60
netsim::tcp_session::connected
bool connected()
Definition: tcp_session.h:113
netsim::ReceiveUnparsed::post
void post(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)
Definition: tcp_session.h:79
netsim::tcp_session::socket
std::unique_ptr< boost::asio::ip::tcp::socket > & socket()
Definition: tcp_session.h:124