Goby v2
connection.h
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #ifndef ASIOLineBasedConnection20100715H
24 #define ASIOLineBasedConnection20100715H
25 
26 #include "goby/common/logger.h"
27 #include "goby/common/time.h"
28 #include "interface.h"
29 
30 namespace goby
31 {
32 namespace util
33 {
34 // template for type of client socket (asio::serial_port, asio::ip::tcp::socket)
35 
36 template <typename ASIOAsyncReadStream> class LineBasedConnection
37 {
38  public:
39  LineBasedConnection<ASIOAsyncReadStream>(LineBasedInterface* interface) : interface_(interface)
40  {
41  }
43 
44  virtual ASIOAsyncReadStream& socket() = 0;
45 
46  void read_start()
47  {
48  async_read_until(socket(), buffer_, interface_->delimiter(),
49  boost::bind(&LineBasedConnection::read_complete, this,
50  boost::asio::placeholders::error));
51  }
52 
53  void write_start()
54  { // Start an asynchronous write and call write_complete when it completes or fails
55  // don't write if it has a dest and the id doesn't match
56  if (!(out_.front().has_dest() && out_.front().dest() != remote_endpoint()))
57  {
58  boost::asio::async_write(socket(), boost::asio::buffer(out_.front().data()),
59  boost::bind(&LineBasedConnection::write_complete, this,
60  boost::asio::placeholders::error));
61  }
62  else
63  {
64  // discard message not for our remote endpoint
65  out_.pop_front();
66  }
67  }
68 
69  void read_complete(const boost::system::error_code& error)
70  {
71  if (error == boost::asio::error::operation_aborted)
72  {
73  return;
74  }
75  else if (error)
76  {
77  // goby::glog.is(goby::common::logger::DEBUG2, goby::common::logger_lock::lock) &&
78  // goby::glog << "Error on reading from socket: " << error.message() << std::endl << unlock;
79  return socket_close(error);
80  }
81 
82  std::istream is(&buffer_);
83  std::string& line = *in_datagram_.mutable_data();
84 
85  if (!remote_endpoint().empty())
86  in_datagram_.set_src(remote_endpoint());
87  if (!local_endpoint().empty())
88  in_datagram_.set_dest(local_endpoint());
89 
90  in_datagram_.set_time(goby::common::goby_time<double>());
91  char last = interface_->delimiter().at(interface_->delimiter().length() - 1);
92  std::getline(is, line, last);
93 
94  {
95  boost::mutex::scoped_lock lock(interface_->in_mutex());
96  interface_->in().push_back(in_datagram_);
97  }
98  read_start(); // start waiting for another asynchronous read again
99  }
100 
101  void write_complete(const boost::system::error_code& error)
102  { // the asynchronous read operation has now completed or failed and returned an error
103  if (error == boost::asio::error::operation_aborted)
104  {
105  return;
106  }
107  else if (error)
108  {
109  // goby::glog.is(goby::common::logger::DEBUG2, goby::common::logger_lock::lock) &&
110  // goby::glog << "Error on writing from socket: " << error.message() << std::endl << unlock;
111 
112  return socket_close(error);
113  }
114 
115  out_.pop_front(); // remove the completed data
116  if (!out_.empty()) // if there is anthing left to be written
117  write_start(); // then start sending the next item in the buffer
118  }
119 
120  virtual void socket_close(const boost::system::error_code& error) = 0;
121 
122  virtual std::string local_endpoint() = 0;
123  virtual std::string remote_endpoint() = 0;
124 
125  std::deque<protobuf::Datagram>& out() { return out_; }
126 
127  private:
128  LineBasedInterface* interface_;
129  boost::asio::streambuf buffer_;
130  protobuf::Datagram in_datagram_;
131  std::deque<protobuf::Datagram> out_; // buffered write data
132 };
133 } // namespace util
134 } // namespace goby
135 
136 #endif
double goby_time< double >()
Returns current UTC time as seconds and fractional seconds since 1970-01-01 00:00:00.
Definition: time.h:130
The global namespace for the Goby project.
basic interface class for all the derived serial (and networking mimics) line-based nodes (serial...
Definition: interface.h:44