Note: Goby version 1 (shown here) is now considered obsolete. Please use version 2 for new projects, and consider upgrading old projects.

Goby Underwater Autonomy Project  Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
util/liblinebasedcomms/connection.h
00001 // copyright 2010 t. schneider tes@mit.edu
00002 //
00003 //
00004 // This program is free software: you can redistribute it and/or modify
00005 // it under the terms of the GNU General Public License as published by
00006 // the Free Software Foundation, either version 3 of the License, or
00007 // (at your option) any later version.
00008 //
00009 // This software is distributed in the hope that it will be useful,
00010 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012 // GNU General Public License for more details.
00013 //
00014 // You should have received a copy of the GNU General Public License
00015 // along with this software.  If not, see <http://www.gnu.org/licenses/>.
00016 
00017 #ifndef ASIOLineBasedConnection20100715H
00018 #define ASIOLineBasedConnection20100715H
00019 
00020 
00021 namespace goby
00022 {
00023     namespace util
00024     {
00025 
00026 // template for type of client socket (asio::serial_port, asio::ip::tcp::socket)
00027         
00028         template<typename ASIOAsyncReadStream>
00029             class LineBasedConnection
00030         {
00031 
00032           protected:
00033             LineBasedConnection<ASIOAsyncReadStream>()
00034             {}
00035             
00036             virtual ASIOAsyncReadStream& socket () = 0;
00037             
00038             void read_start()
00039             {
00040                 async_read_until(socket(), buffer_, LineBasedInterface::delimiter_,
00041                                  boost::bind(&LineBasedConnection::read_complete,
00042                                              this,
00043                                              boost::asio::placeholders::error));
00044             }
00045             
00046             void write_start()
00047             { // Start an asynchronous write and call write_complete when it completes or fails
00048                 // don't write if it has a dest and the id doesn't match
00049                 if(!(out_.front().has_dest() && out_.front().dest() != remote_endpoint()))
00050                 {
00051                     boost::asio::async_write(socket(),
00052                                              boost::asio::buffer(out_.front().data()),
00053                                              boost::bind(&LineBasedConnection::write_complete,
00054                                                          this,
00055                                                          boost::asio::placeholders::error));
00056                 }
00057                 else
00058                 {
00059                     // discard message not for our remote endpoint
00060                     out_.pop_front();
00061                 }
00062             }
00063             
00064             void read_complete(const boost::system::error_code& error)
00065             {     
00066                 if(error) return socket_close(error);
00067 
00068                 std::istream is(&buffer_);
00069 
00070                 std::string& line = *in_datagram_.mutable_data();
00071 
00072                 if(!remote_endpoint().empty())
00073                     in_datagram_.set_src(remote_endpoint());
00074                 if(!local_endpoint().empty())
00075                     in_datagram_.set_dest(local_endpoint());
00076 
00077                 char last = LineBasedInterface::delimiter_.at(LineBasedInterface::delimiter_.length()-1);
00078                 while(!std::getline(is, line, last).eof())
00079                 {
00080                     line = extra_ + line + last;
00081                     // grab a lock on the in_ deque because the user can modify    
00082                     boost::mutex::scoped_lock lock(LineBasedInterface::in_mutex_);
00083                     
00084                     LineBasedInterface::in_.push_back(in_datagram_);
00085                     
00086                     extra_.clear();
00087                 }
00088                 
00089                 // store any remainder for the next round
00090                 if(!line.empty()) extra_ = line;
00091                 
00092                 read_start(); // start waiting for another asynchronous read again
00093             }    
00094 
00095             void write_complete(const boost::system::error_code& error)
00096             { // the asynchronous read operation has now completed or failed and returned an error
00097                 if(error) return socket_close(error);
00098                 
00099                 out_.pop_front(); // remove the completed data
00100                 if (!out_.empty()) // if there is anthing left to be written
00101                     write_start(); // then start sending the next item in the buffer
00102             }
00103 
00104             virtual void socket_close(const boost::system::error_code& error) = 0;
00105             
00106             virtual std::string local_endpoint() = 0;
00107             virtual std::string remote_endpoint() = 0;
00108             
00109             std::deque<protobuf::Datagram>& out() { return out_; }
00110             
00111             
00112           private:
00113             boost::asio::streambuf buffer_; 
00114             std::string extra_;
00115             protobuf::Datagram in_datagram_;
00116             std::deque<protobuf::Datagram> out_; // buffered write data
00117                 
00118             
00119         };
00120     }
00121 }
00122 
00123 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends