Goby v2
pb_modem_driver.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #include "pb_modem_driver.h"
24 
25 #include "goby/acomms/modemdriver/driver_exception.h"
26 #include "goby/acomms/modemdriver/mm_driver.h"
27 #include "goby/common/logger.h"
28 #include "goby/util/binary.h"
29 
30 using goby::glog;
31 using goby::util::hex_decode;
32 using goby::util::hex_encode;
33 using namespace goby::common::logger;
35 
36 goby::pb::PBDriver::PBDriver(goby::common::ZeroMQService* zeromq_service)
37  : StaticProtobufNode(zeromq_service), zeromq_service_(zeromq_service),
38  last_send_time_(goby_time<uint64>()), request_socket_id_(0), query_interval_seconds_(1),
39  reset_interval_seconds_(120), waiting_for_reply_(false), next_frame_(0)
40 {
41  on_receipt<acomms::protobuf::StoreServerResponse>(0, &PBDriver::handle_response, this);
42 }
43 
45 {
46  driver_cfg_ = cfg;
47  request_.set_modem_id(driver_cfg_.modem_id());
48 
49  service_cfg_.add_socket()->CopyFrom(driver_cfg_.GetExtension(PBDriverConfig::request_socket));
50  zeromq_service_->set_cfg(service_cfg_);
51 
52  request_socket_id_ = driver_cfg_.GetExtension(PBDriverConfig::request_socket).socket_id();
53 
54  query_interval_seconds_ = driver_cfg_.GetExtension(PBDriverConfig::query_interval_seconds);
55 
56  reset_interval_seconds_ = driver_cfg_.GetExtension(PBDriverConfig::reset_interval_seconds);
57 }
58 
60 
63 {
64  switch (orig_msg.type())
65  {
66  case acomms::protobuf::ModemTransmission::DATA:
67  {
68  // buffer the message
70  signal_modify_transmission(&msg);
71 
72  if (driver_cfg_.modem_id() == msg.src())
73  {
74  if (!msg.has_frame_start())
75  msg.set_frame_start(next_frame_);
76 
77  // this is our transmission
78  if (msg.rate() < driver_cfg_.ExtensionSize(PBDriverConfig::rate_to_bytes))
79  msg.set_max_frame_bytes(
80  driver_cfg_.GetExtension(PBDriverConfig::rate_to_bytes, msg.rate()));
81  else
82  msg.set_max_frame_bytes(
83  driver_cfg_.GetExtension(PBDriverConfig::max_frame_size));
84 
85  if (msg.rate() < driver_cfg_.ExtensionSize(PBDriverConfig::rate_to_frames))
86  msg.set_max_num_frames(
87  driver_cfg_.GetExtension(PBDriverConfig::rate_to_frames, msg.rate()));
88 
89  // no data given to us, let's ask for some
90  if (msg.frame_size() < (int)msg.max_num_frames())
91  ModemDriverBase::signal_data_request(&msg);
92 
93  next_frame_ += msg.frame_size();
94 
95  // don't send an empty message
96  if (msg.frame_size() && msg.frame(0).size())
97  {
98  request_.add_outbox()->CopyFrom(msg);
99  }
100  }
101  else
102  {
103  // send thirdparty "poll"
104  msg.SetExtension(PBDriverTransmission::poll_src, msg.src());
105  msg.SetExtension(PBDriverTransmission::poll_dest, msg.dest());
106 
107  msg.set_dest(msg.src());
108  msg.set_src(driver_cfg_.modem_id());
109 
110  msg.set_type(goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC);
111  msg.SetExtension(PBDriverTransmission::type, PBDriverTransmission::PB_DRIVER_POLL);
112 
113  request_.add_outbox()->CopyFrom(msg);
114  }
115  }
116  break;
117  default:
118  glog.is(DEBUG1) && glog << group(glog_out_group()) << warn
119  << "Not initiating transmission because we were given an "
120  "invalid transmission type for the base Driver:"
121  << orig_msg.DebugString() << std::endl;
122  break;
123  }
124 }
125 
127 {
128  while (zeromq_service_->poll(0)) {}
129 
130  // call in with our outbox
131  if (!waiting_for_reply_ && request_.IsInitialized() &&
132  goby_time<uint64>() >
133  last_send_time_ + 1000000 * static_cast<uint64>(query_interval_seconds_))
134  {
135  static int request_id = 0;
136  request_.set_request_id(request_id++);
137  glog.is(DEBUG1) && glog << group(glog_out_group()) << "Sending to server." << std::endl;
138  glog.is(DEBUG2) && glog << group(glog_out_group()) << "Outbox: " << request_.DebugString()
139  << std::flush;
140  send(request_, request_socket_id_);
141  last_send_time_ = goby_time<uint64>();
142  request_.clear_outbox();
143  waiting_for_reply_ = true;
144  }
145  else if (waiting_for_reply_ &&
146  goby_time<uint64>() > last_send_time_ + 1e6 * reset_interval_seconds_)
147  {
148  glog.is(DEBUG1) && glog << group(glog_out_group()) << warn << "No response in "
149  << reset_interval_seconds_ << " seconds, resetting socket."
150  << std::endl;
151  zeromq_service_->close_all();
152  zeromq_service_->set_cfg(service_cfg_);
153  waiting_for_reply_ = false;
154  }
155 }
156 
157 void goby::pb::PBDriver::handle_response(const acomms::protobuf::StoreServerResponse& response)
158 {
159  glog.is(DEBUG1) && glog << group(glog_in_group()) << "Received response in "
160  << (goby_time<uint64_t>() - last_send_time_) / 1.0e6 << " seconds."
161  << std::endl;
162 
163  glog.is(DEBUG2) && glog << group(glog_in_group()) << "Inbox: " << response.DebugString()
164  << std::flush;
165 
166  for (int i = 0, n = response.inbox_size(); i < n; ++i)
167  {
168  const acomms::protobuf::ModemTransmission& msg = response.inbox(i);
169 
170  // poll for us
171  if (msg.type() == goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC &&
172  msg.GetExtension(PBDriverTransmission::type) == PBDriverTransmission::PB_DRIVER_POLL &&
173  msg.GetExtension(PBDriverTransmission::poll_src) == driver_cfg_.modem_id())
174  {
176  data_msg.set_type(goby::acomms::protobuf::ModemTransmission::DATA);
177  data_msg.set_src(msg.GetExtension(PBDriverTransmission::poll_src));
178  data_msg.set_dest(msg.GetExtension(PBDriverTransmission::poll_dest));
179 
180  data_msg.ClearExtension(PBDriverTransmission::type);
181  data_msg.ClearExtension(PBDriverTransmission::poll_dest);
182  data_msg.ClearExtension(PBDriverTransmission::poll_src);
183 
184  handle_initiate_transmission(data_msg);
185  }
186  else
187  {
188  // ack any packets
189  if (msg.dest() == driver_cfg_.modem_id() &&
190  msg.type() == acomms::protobuf::ModemTransmission::DATA && msg.ack_requested())
191  {
192  acomms::protobuf::ModemTransmission& ack = *request_.add_outbox();
193  ack.set_type(goby::acomms::protobuf::ModemTransmission::ACK);
194  ack.set_src(msg.dest());
195  ack.set_dest(msg.src());
196  for (int i = msg.frame_start(), n = msg.frame_size() + msg.frame_start(); i < n;
197  ++i)
198  ack.add_acked_frame(i);
199  }
200 
201  signal_receive(msg);
202  }
203  }
204 
205  waiting_for_reply_ = false;
206 }
void startup(const acomms::protobuf::DriverConfig &cfg)
Starts the modem driver. Must be called before poll().
void do_work()
Allows the modem driver to do its work.
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
Definition: time.h:104
void handle_initiate_transmission(const acomms::protobuf::ModemTransmission &m)
Virtual initiate_transmission method. Typically connected to MACManager::signal_initiate_transmission...
common::FlexOstream glog
Access the Goby logger through this object.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
void shutdown()
Shuts down the modem driver.