NETSIM 1.2.0
Hardware-in-the-loop acoustic network simulator
 
Loading...
Searching...
No Matches
tcp_session.h
Go to the documentation of this file.
1// Copyright 2020:
2// GobySoft, LLC (2017-)
3// Massachusetts Institute of Technology (2017-)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the NETSIM Libraries.
9//
10// The NETSIM Libraries are free software: you can redistribute them and/or modify
11// them under the terms of the GNU Lesser General Public License as published by
12// the Free Software Foundation, either version 2.1 of the License, or
13// (at your option) any later version.
14//
15// The NETSIM Libraries are distributed in the hope that they will be useful,
16// but WITHOUT ANY WARRANTY; without even the implied warranty of
17// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18// GNU Lesser General Public License for more details.
19//
20// You should have received a copy of the GNU Lesser General Public License
21// along with NETSIM. If not, see <http://www.gnu.org/licenses/>.
22
23#ifndef TCP_SESSION_20180131H
24#define TCP_SESSION_20180131H
25
26#include <cstdint>
27#include <cstdlib>
28#include <memory>
29#include <set>
30#include <unordered_map>
31#include <utility>
32#include <vector>
33
34#include <boost/asio.hpp>
35
36#include <google/protobuf/message.h>
37
38#include <dccl/binary.h>
39
40#include <goby/util/asio_compat.h>
41
42namespace netsim
43{
45{
46 public:
47 ReceiveBase() = default;
48 virtual ~ReceiveBase() = default;
49
50 virtual void post(const std::string& pb_name, const std::string& bytes,
51 const boost::asio::ip::tcp::endpoint& remote) = 0;
52};
53
54template <typename ProtobufMessage> class Receive : public ReceiveBase
55{
56 public:
57 using CallbackType = std::function<void(const ProtobufMessage& msg,
58 const boost::asio::ip::tcp::endpoint& remote)>;
59
60 Receive(CallbackType f) : f_(f) {}
61 void post(const std::string& pb_name, const std::string& bytes,
62 const boost::asio::ip::tcp::endpoint& remote)
63 {
64 ProtobufMessage msg;
65 msg.ParseFromString(bytes);
66 f_(msg, remote);
67 }
68
69 private:
70 CallbackType f_;
71};
72
74{
75 public:
76 using CallbackType = std::function<void(const std::string& pb_name, const std::string& bytes,
77 const boost::asio::ip::tcp::endpoint& remote)>;
79 void post(const std::string& pb_name, const std::string& bytes,
80 const boost::asio::ip::tcp::endpoint& remote)
81 {
82 f_(pb_name, bytes, remote);
83 }
84
85 private:
86 CallbackType f_;
87};
88
89class tcp_session : public std::enable_shared_from_this<tcp_session>
90{
91 public:
92 tcp_session(std::unique_ptr<boost::asio::ip::tcp::socket> socket);
93 void start();
94
95 // receive all messages (unparsed)
97 {
98 std::unique_ptr<ReceiveBase> rx(new ReceiveUnparsed(f));
99 rx_all_.insert(std::move(rx));
100 }
101
102 // receive specific types (parsed)
103 template <typename ProtobufMessage>
105 {
106 std::unique_ptr<ReceiveBase> rx(new Receive<ProtobufMessage>(f));
107 rx_callbacks_.insert(
108 std::make_pair(ProtobufMessage::descriptor()->full_name(), std::move(rx)));
109 }
110
111 void write(const google::protobuf::Message& message);
113 bool connected() { return bool(self_); };
114
115 boost::asio::ip::tcp::endpoint remote_endpoint()
116 {
117 if (socket_)
118 return socket_->remote_endpoint();
119 else
120 return boost::asio::ip::tcp::endpoint();
121 }
122
123 protected:
124 std::unique_ptr<boost::asio::ip::tcp::socket>& socket() { return socket_; }
125
126 // NETSIM|[ASCII protobuf name, e.g. goby.moos.protobuf.NodeStatus]|data (b64)\n
127 private:
128 void read();
129 void name_read();
130 void data_read(std::uint32_t size);
131
132 private:
133 std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
134 // used to indicate that we are still valid (connected)
135 // if we reset this, the server should clean up this connection
136 std::shared_ptr<netsim::tcp_session> self_;
137
138 // protobuf full name -> Receive
139 std::unordered_multimap<std::string, std::unique_ptr<ReceiveBase>> rx_callbacks_;
140 std::set<std::unique_ptr<ReceiveBase>> rx_all_;
141
142 boost::asio::streambuf buffer_;
143
144 const char end_of_line_{'\n'};
145 const char delimiter_{'|'};
146 const std::string preamble_{"NETSIM"};
147};
148} // namespace netsim
149
150#endif
virtual void post(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)=0
virtual ~ReceiveBase()=default
void post(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)
Definition tcp_session.h:61
std::function< void(const ProtobufMessage &msg, const boost::asio::ip::tcp::endpoint &remote)> CallbackType
Definition tcp_session.h:58
Receive(CallbackType f)
Definition tcp_session.h:60
ReceiveUnparsed(CallbackType f)
Definition tcp_session.h:78
void post(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)
Definition tcp_session.h:79
std::function< void(const std::string &pb_name, const std::string &bytes, const boost::asio::ip::tcp::endpoint &remote)> CallbackType
Definition tcp_session.h:77
std::unique_ptr< boost::asio::ip::tcp::socket > & socket()
void read_callback(typename ReceiveUnparsed::CallbackType f)
Definition tcp_session.h:96
void write(const google::protobuf::Message &message)
boost::asio::ip::tcp::endpoint remote_endpoint()
tcp_session(std::unique_ptr< boost::asio::ip::tcp::socket > socket)
void read_callback(typename Receive< ProtobufMessage >::CallbackType f)