Goby v2
mosh_relay.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include <boost/asio.hpp>
23 
24 #include "goby/common/logger.h"
25 
26 #include "goby/pb/application.h"
27 
28 #include "goby/acomms/protobuf/mosh_packet.pb.h"
29 #include "mosh_relay_config.pb.h"
30 
31 using namespace goby::common::logger;
32 
33 using boost::asio::ip::udp;
34 
35 namespace goby
36 {
37 namespace acomms
38 {
40 {
41  public:
42  Packetizer();
43  Packetizer(int src, int dest, const std::vector<char>& input);
44  const std::set<protobuf::MoshPacket>& fragments() { return fragments_; }
45 
46  bool add_fragment(const protobuf::MoshPacket& frag);
47  std::vector<char> reassemble();
48 
49  private:
50  std::set<protobuf::MoshPacket> fragments_;
51 };
52 
54 {
55  public:
57  ~MoshRelay();
58 
59  private:
60  void loop();
61  void start_udp_receive();
62  void handle_udp_receive(const boost::system::error_code& error, std::size_t bytes_transferred);
63  void handle_goby_receive(const protobuf::MoshPacket& pkt);
64  void handle_udp_send(const boost::system::error_code& error, std::size_t bytes_transferred);
65 
66  private:
68  boost::asio::io_service io_service_;
69  udp::socket socket_;
70 
71  enum
72  {
73  MOSH_UDP_PAYLOAD_SIZE = 1300
74  };
75 
76  udp::endpoint remote_endpoint_;
77  std::vector<char> recv_buffer_;
78 
79  typedef int ModemId;
80  std::map<ModemId, Packetizer> packets_;
81 };
82 
83 namespace protobuf
84 {
85 inline bool operator<(const MoshPacket& a, const MoshPacket& b)
86 {
87  return a.frag_num() < b.frag_num();
88 }
89 } // namespace protobuf
90 
91 } // namespace acomms
92 } // namespace goby
93 
94 const int MOSH_FRAGMENT_SIZE = goby::acomms::protobuf::MoshPacket::descriptor()
95  ->FindFieldByName("fragment")
96  ->options()
97  .GetExtension(dccl::field)
98  .max_length();
99 
100 void test_packetizer(int size)
101 {
102  std::vector<char> in(size, 0);
103  for (int i = 0; i < size; ++i) in[i] = i % 256;
104  goby::acomms::Packetizer pi(1, 2, in), po;
105  const std::set<goby::acomms::protobuf::MoshPacket>& f = pi.fragments();
106  bool ready = false;
107  for (std::set<goby::acomms::protobuf::MoshPacket>::const_iterator it = f.begin(), end = f.end();
108  it != end; ++it)
109  ready = po.add_fragment(*it);
110  assert(ready);
111 
112  assert(po.reassemble() == in);
113 }
114 
115 int main(int argc, char* argv[])
116 {
117  test_packetizer(MOSH_FRAGMENT_SIZE * 4);
118  test_packetizer(1300);
119  test_packetizer(MOSH_FRAGMENT_SIZE - 10);
120  test_packetizer(MOSH_FRAGMENT_SIZE * 2 - 5);
121 
123  goby::run<goby::acomms::MoshRelay>(argc, argv, &cfg);
124 }
125 
126 using goby::glog;
127 
128 goby::acomms::MoshRelay::MoshRelay(protobuf::MoshRelayConfig* cfg)
129  : goby::pb::Application(cfg), cfg_(*cfg), socket_(io_service_),
130  recv_buffer_(MOSH_UDP_PAYLOAD_SIZE, 0)
131 {
132  glog.is(DEBUG1) && glog << cfg_.DebugString() << std::endl;
133 
134  socket_.open(udp::v4());
135  if (cfg_.bind())
136  {
137  socket_.bind(udp::endpoint(boost::asio::ip::address::from_string(cfg_.ip_address()),
138  cfg_.udp_port()));
139  }
140  else
141  {
142  remote_endpoint_ = udp::endpoint(boost::asio::ip::address::from_string(cfg_.ip_address()),
143  cfg_.udp_port());
144  }
145  start_udp_receive();
146 
147  subscribe(&MoshRelay::handle_goby_receive, this,
148  "QueueRx" + goby::util::as<std::string>(cfg_.src_modem_id()));
149 }
150 
151 goby::acomms::MoshRelay::~MoshRelay() {}
152 
153 void goby::acomms::MoshRelay::loop() { io_service_.poll(); }
154 
155 void goby::acomms::MoshRelay::start_udp_receive()
156 {
157  socket_.async_receive_from(boost::asio::buffer(recv_buffer_), remote_endpoint_,
158  boost::bind(&MoshRelay::handle_udp_receive, this,
159  boost::asio::placeholders::error,
160  boost::asio::placeholders::bytes_transferred));
161 }
162 
163 void goby::acomms::MoshRelay::handle_udp_receive(const boost::system::error_code& error,
164  std::size_t bytes_transferred)
165 {
166  if (!error || error == boost::asio::error::message_size)
167  {
168  glog.is(DEBUG1) && glog << remote_endpoint_ << ": " << bytes_transferred << " Bytes"
169  << std::endl;
170 
172  cfg_.src_modem_id(), cfg_.dest_modem_id(),
173  std::vector<char>(recv_buffer_.begin(), recv_buffer_.begin() + bytes_transferred));
174  const std::set<goby::acomms::protobuf::MoshPacket>& f = p.fragments();
175  for (std::set<goby::acomms::protobuf::MoshPacket>::const_iterator it = f.begin(),
176  end = f.end();
177  it != end; ++it)
178  publish(*it, "QueuePush" + goby::util::as<std::string>(cfg_.src_modem_id()));
179 
180  start_udp_receive();
181  }
182 }
183 
184 void goby::acomms::MoshRelay::handle_goby_receive(const protobuf::MoshPacket& packet)
185 {
186  glog.is(DEBUG1) && glog << "> " << packet.ShortDebugString() << std::endl;
187 
188  if (packet.dest() == (int)cfg_.src_modem_id() && packet.src() == (int)cfg_.dest_modem_id())
189  {
190  if (packets_[packet.src()].add_fragment(packet))
191  {
192  socket_.async_send_to(
193  boost::asio::buffer(packets_[packet.src()].reassemble()), remote_endpoint_,
194  boost::bind(&MoshRelay::handle_udp_send, this, boost::asio::placeholders::error,
195  boost::asio::placeholders::bytes_transferred));
196  packets_.erase(packet.src());
197  }
198  }
199 }
200 
201 void goby::acomms::MoshRelay::handle_udp_send(const boost::system::error_code& /*error*/,
202  std::size_t /*bytes_transferred*/)
203 {
204 }
205 
206 goby::acomms::Packetizer::Packetizer() {}
207 
208 goby::acomms::Packetizer::Packetizer(int src, int dest, const std::vector<char>& input)
209 {
211  packet.set_src(src);
212  packet.set_dest(dest);
213 
214  for (int i = 0, n = (input.size() + MOSH_FRAGMENT_SIZE - 1) / MOSH_FRAGMENT_SIZE; i < n; ++i)
215  {
216  packet.set_frag_num(i);
217  packet.set_frag_len(
218  std::min(MOSH_FRAGMENT_SIZE, (int)(input.size() - i * MOSH_FRAGMENT_SIZE)));
219  packet.set_is_last_frag(i + 1 == n);
220  std::string* frag = packet.mutable_fragment();
221  frag->resize(MOSH_FRAGMENT_SIZE);
222 
223  std::vector<char>::const_iterator begin = input.begin() + i * MOSH_FRAGMENT_SIZE,
224  end = begin + packet.frag_len();
225  std::copy(begin, end, frag->begin());
226 
227  glog.is(DEBUG1) && glog << packet.DebugString() << std::endl;
228 
229  fragments_.insert(packet);
230  }
231 }
232 
233 bool goby::acomms::Packetizer::add_fragment(const protobuf::MoshPacket& frag)
234 {
235  fragments_.insert(frag);
236 
237  // packet loss
238  if (frag.is_last_frag() && fragments_.size() != frag.frag_num() + 1)
239  {
240  fragments_.clear();
241  glog.is(WARN) && glog << "Missed fragment" << std::endl;
242  return false;
243  }
244 
245  return frag.is_last_frag();
246 }
247 
248 std::vector<char> goby::acomms::Packetizer::reassemble()
249 {
250  if (!fragments_.size())
251  return std::vector<char>();
252 
253  std::vector<char> out(MOSH_FRAGMENT_SIZE * fragments_.size(), 0);
254 
255  std::vector<char>::iterator oit = out.begin();
256 
257  for (std::set<protobuf::MoshPacket>::const_iterator it = fragments_.begin(),
258  end = fragments_.end();
259  it != end; ++it)
260  { oit = std::copy(it->fragment().begin(), it->fragment().begin() + it->frag_len(), oit); }
261  out.resize(out.size() - (MOSH_FRAGMENT_SIZE - fragments_.rbegin()->frag_len()));
262  return out;
263 }
Base class provided for users to generate applications that participate in the Goby publish/subscribe...
Definition: application.h:49
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.