Goby v2
bridge.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include "goby/common/logger.h"
23 #include "goby/common/logger/term_color.h"
24 #include "goby/common/zeromq_service.h"
25 
26 #include "goby/pb/application.h"
27 
28 #include "goby/acomms/amac.h"
29 #include "goby/acomms/queue.h"
30 #include "goby/acomms/route.h"
31 
32 #include "goby/acomms/bind.h"
33 #include "goby/acomms/connect.h"
34 
35 #include "goby/acomms/protobuf/queue.pb.h"
36 
37 #include "goby/acomms/protobuf/file_transfer.pb.h"
38 #include "goby/acomms/protobuf/mm_driver.pb.h"
39 #include "goby/acomms/protobuf/modem_driver_status.pb.h"
40 #include "goby/acomms/protobuf/mosh_packet.pb.h"
41 #include "goby/acomms/protobuf/time_update.pb.h"
42 
43 #include "bridge_config.pb.h"
44 
45 using namespace goby::common::logger;
46 
47 namespace goby
48 {
49 namespace acomms
50 {
52 {
53  public:
55  ~Bridge();
56 
57  private:
58  void loop();
59 
60  void handle_link_ack(const protobuf::ModemTransmission& ack_msg,
61  const google::protobuf::Message& orig_msg, QueueManager* from_queue);
62 
63  void handle_queue_receive(const google::protobuf::Message& msg, QueueManager* from_queue);
64 
65  void handle_modem_receive(const goby::acomms::protobuf::ModemTransmission& message,
66  QueueManager* in_queue);
67 
68  void handle_external_push(boost::shared_ptr<google::protobuf::Message> msg,
69  QueueManager* in_queue)
70  {
71  try
72  {
73  in_queue->push_message(*msg);
74  }
75  catch (std::exception& e)
76  {
77  glog.is(WARN) && glog << "Failed to push message: " << e.what() << std::endl;
78  }
79  }
80 
81  void handle_initiate_transmission(const protobuf::ModemTransmission& m, int subnet);
82 
83  void handle_data_request(const protobuf::ModemTransmission& m, int subnet);
84 
85  void handle_driver_status(const protobuf::ModemDriverStatus& m, int subnet);
86 
87  void generate_hw_ctl_network_ack(QueueManager* in_queue,
88  goby::acomms::protobuf::NetworkAck::AckType type);
89  void generate_time_update_network_ack(QueueManager* in_queue,
90  goby::acomms::protobuf::NetworkAck::AckType type);
91 
92  private:
94 
95  std::vector<boost::shared_ptr<QueueManager> > q_managers_;
96  std::vector<boost::shared_ptr<MACManager> > mac_managers_;
97 
98  RouteManager r_manager_;
99 
100  boost::shared_ptr<micromodem::protobuf::HardwareControlCommand> pending_hw_ctl_;
101  boost::shared_ptr<goby::acomms::protobuf::TimeUpdateResponse> pending_time_update_;
102  goby::uint64 time_update_request_time_;
103 };
104 } // namespace acomms
105 } // namespace goby
106 
107 int main(int argc, char* argv[])
108 {
110  goby::run<goby::acomms::Bridge>(argc, argv, &cfg);
111 }
112 
113 using goby::glog;
114 
115 goby::acomms::Bridge::Bridge(protobuf::BridgeConfig* cfg)
116  : Application(cfg), DynamicProtobufNode(&Application::zeromq_service()), cfg_(*cfg),
117  time_update_request_time_(0)
118 {
119  glog.is(DEBUG1) && glog << cfg_.DebugString() << std::endl;
120 
121  goby::acomms::DCCLCodec::get()->set_cfg(cfg->dccl_cfg());
122 
123  // load all shared libraries
124  for (int i = 0, n = cfg_.load_shared_library_size(); i < n; ++i)
125  {
126  glog.is(DEBUG1) && glog << "Loading shared library: " << cfg_.load_shared_library(i)
127  << std::endl;
128 
129  void* handle =
130  goby::util::DynamicProtobufManager::load_from_shared_lib(cfg_.load_shared_library(i));
131 
132  if (!handle)
133  {
134  glog.is(DIE) && glog << "Failed ... check path provided or add to /etc/ld.so.conf "
135  << "or LD_LIBRARY_PATH" << std::endl;
136  }
137 
138  glog.is(DEBUG1) && glog << "Loading shared library dccl codecs." << std::endl;
139 
140  goby::acomms::DCCLCodec::get()->load_shared_library_codecs(handle);
141  }
142 
143  // load all .proto files
144  goby::util::DynamicProtobufManager::enable_compilation();
145  for (int i = 0, n = cfg_.load_proto_file_size(); i < n; ++i)
146  {
147  glog.is(DEBUG1) && glog << "Loading protobuf file: " << cfg_.load_proto_file(i)
148  << std::endl;
149 
150  if (!goby::util::DynamicProtobufManager::load_from_proto_file(cfg_.load_proto_file(i)))
151  glog.is(DIE) && glog << "Failed to load file." << std::endl;
152  }
153 
154  r_manager_.set_cfg(cfg_.route_cfg());
155  q_managers_.resize(cfg_.subnet_size());
156  mac_managers_.resize(cfg_.subnet_size());
157  for (int i = 0, n = cfg_.subnet_size(); i < n; ++i)
158  {
159  q_managers_[i].reset(new QueueManager);
160  mac_managers_[i].reset(new MACManager);
161 
162  goby::acomms::protobuf::QueueManagerConfig qcfg = cfg_.subnet(i).queue_cfg();
163  q_managers_[i]->set_cfg(qcfg);
164 
165  mac_managers_[i]->startup(cfg_.subnet(i).mac_cfg());
166 
167  goby::acomms::bind(*q_managers_[i], r_manager_);
168 
170  &(q_managers_[i]->signal_ack),
171  boost::bind(&Bridge::handle_link_ack, this, _1, _2, q_managers_[i].get()));
172 
174  &(q_managers_[i]->signal_receive),
175  boost::bind(&Bridge::handle_queue_receive, this, _1, q_managers_[i].get()));
176 
177  Application::subscribe<goby::acomms::protobuf::ModemTransmission>(
178  boost::bind(&Bridge::handle_modem_receive, this, _1, q_managers_[i].get()),
179  "Rx" + goby::util::as<std::string>(qcfg.modem_id()));
180 
181  DynamicProtobufNode::subscribe(
182  goby::common::PubSubNodeWrapperBase::SOCKET_SUBSCRIBE,
183  boost::bind(&Bridge::handle_external_push, this, _1, q_managers_[i].get()),
184  "QueuePush" + goby::util::as<std::string>(qcfg.modem_id()));
185 
186  Application::subscribe<goby::acomms::protobuf::ModemTransmission>(
187  boost::bind(&Bridge::handle_data_request, this, _1, i),
188  "DataRequest" + goby::util::as<std::string>(qcfg.modem_id()));
189 
190  Application::subscribe<goby::acomms::protobuf::ModemDriverStatus>(
191  boost::bind(&Bridge::handle_driver_status, this, _1, i),
192  "Status" + goby::util::as<std::string>(qcfg.modem_id()));
193 
194  goby::acomms::connect(&mac_managers_[i]->signal_initiate_transmission,
195  boost::bind(&Bridge::handle_initiate_transmission, this, _1, i));
196  }
197 }
198 
199 goby::acomms::Bridge::~Bridge() {}
200 
201 void goby::acomms::Bridge::loop()
202 {
203  for (std::vector<boost::shared_ptr<QueueManager> >::iterator it = q_managers_.begin(),
204  end = q_managers_.end();
205  it != end; ++it)
206  { (*it)->do_work(); }
207 
208  for (std::vector<boost::shared_ptr<MACManager> >::iterator it = mac_managers_.begin(),
209  end = mac_managers_.end();
210  it != end; ++it)
211  { (*it)->do_work(); } goby::uint64 now = goby::common::goby_time<goby::uint64>();
212  if (pending_hw_ctl_ && (pending_hw_ctl_->time() + cfg_.special_command_ttl() * 1000000 < now))
213  {
214  glog.is(VERBOSE) && glog << "HardwareControlCommand expired." << std::endl;
215 
216  generate_hw_ctl_network_ack(q_managers_.at(0).get(),
217  goby::acomms::protobuf::NetworkAck::EXPIRE);
218  pending_hw_ctl_.reset();
219  }
220 
221  if (pending_time_update_ &&
222  (pending_time_update_->time() + cfg_.special_command_ttl() * 1000000 < now))
223  {
224  glog.is(VERBOSE) && glog << "TimeUpdateRequest expired." << std::endl;
225 
226  generate_time_update_network_ack(q_managers_.at(0).get(),
227  goby::acomms::protobuf::NetworkAck::EXPIRE);
228  pending_time_update_.reset();
229  }
230 }
231 
232 void goby::acomms::Bridge::handle_queue_receive(const google::protobuf::Message& msg,
233  QueueManager* from_queue)
234 {
235  publish(msg, "QueueRx" + goby::util::as<std::string>(from_queue->modem_id()));
236 
237  // handle various command messages
238  if (msg.GetDescriptor() == goby::acomms::protobuf::RouteCommand::descriptor())
239  {
241  route_cmd.CopyFrom(msg);
242  glog.is(VERBOSE) && glog << "Received RouteCommand: " << msg.DebugString() << std::endl;
243  goby::acomms::protobuf::RouteManagerConfig cfg = cfg_.route_cfg();
244  cfg.mutable_route()->CopyFrom(route_cmd.new_route());
245  r_manager_.set_cfg(cfg);
246  }
247  else if (msg.GetDescriptor() == micromodem::protobuf::HardwareControlCommand::descriptor())
248  {
249  pending_hw_ctl_.reset(new micromodem::protobuf::HardwareControlCommand);
250  pending_hw_ctl_->CopyFrom(msg);
251  if (!pending_hw_ctl_->has_hw_ctl_dest())
252  pending_hw_ctl_->set_hw_ctl_dest(pending_hw_ctl_->command_dest());
253 
254  glog.is(VERBOSE) && glog << "Received HardwareControlCommand: " << msg.DebugString()
255  << std::endl;
256  }
257  else if (msg.GetDescriptor() == goby::acomms::protobuf::TimeUpdateRequest::descriptor())
258  {
260  request.CopyFrom(msg);
261 
262  pending_time_update_.reset(new goby::acomms::protobuf::TimeUpdateResponse);
263  pending_time_update_->set_time(request.time());
264  time_update_request_time_ = request.time();
265  pending_time_update_->set_request_src(request.src());
266  pending_time_update_->set_src(from_queue->modem_id());
267  pending_time_update_->set_dest(request.update_time_for_id());
268 
269  glog.is(VERBOSE) && glog << "Received TimeUpdateRequest: " << msg.DebugString()
270  << std::endl;
271  }
272 }
273 
274 void goby::acomms::Bridge::handle_link_ack(const protobuf::ModemTransmission& ack_msg,
275  const google::protobuf::Message& orig_msg,
276  QueueManager* from_queue)
277 {
278  // publish link ack
279  publish(orig_msg, "QueueAckOrig" + goby::util::as<std::string>(from_queue->modem_id()));
280 }
281 
282 void goby::acomms::Bridge::handle_modem_receive(
284 {
285  try
286  {
287  in_queue->handle_modem_receive(message);
288 
289  if (cfg_.forward_cacst())
290  {
291  for (int i = 0, n = message.ExtensionSize(micromodem::protobuf::receive_stat); i < n;
292  ++i)
293  {
295  message.GetExtension(micromodem::protobuf::receive_stat, i);
296 
297  glog.is(VERBOSE) && glog << "Forwarding statistics message to topside: " << cacst
298  << std::endl;
299  r_manager_.handle_in(in_queue->meta_from_msg(cacst), cacst, in_queue->modem_id());
300  }
301  }
302 
303  if (cfg_.forward_ranging_reply() &&
304  message.HasExtension(micromodem::protobuf::ranging_reply))
305  {
307  message.GetExtension(micromodem::protobuf::ranging_reply);
308 
309  glog.is(VERBOSE) && glog << "Forwarding ranging message to topside: " << ranging
310  << std::endl;
311  r_manager_.handle_in(in_queue->meta_from_msg(ranging), ranging, in_queue->modem_id());
312  }
313 
314  if (pending_time_update_)
315  {
316  if (message.type() == goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC &&
317  message.GetExtension(micromodem::protobuf::type) ==
318  micromodem::protobuf::MICROMODEM_TWO_WAY_PING)
319  {
321  message.GetExtension(micromodem::protobuf::ranging_reply);
322 
323  if (range_reply.one_way_travel_time_size() > 0)
324  pending_time_update_->set_time_of_flight_microsec(
325  range_reply.one_way_travel_time(0) * 1e6);
326 
327  glog.is(VERBOSE) && glog << "Received time of flight of "
328  << pending_time_update_->time_of_flight_microsec()
329  << " microseconds" << std::endl;
330  }
331  else if (message.type() == goby::acomms::protobuf::ModemTransmission::ACK &&
332  pending_time_update_->has_time_of_flight_microsec())
333  {
334  if (message.acked_frame_size() && message.acked_frame(0) == 0)
335  {
336  // ack for our response
337  glog.is(VERBOSE) && glog << "Received ack for TimeUpdateResponse" << std::endl;
338 
339  generate_time_update_network_ack(in_queue,
340  goby::acomms::protobuf::NetworkAck::ACK);
341  pending_time_update_.reset();
342  }
343  }
344  }
345 
346  if (pending_hw_ctl_ &&
347  message.type() == goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC &&
348  message.GetExtension(micromodem::protobuf::type) ==
349  micromodem::protobuf::MICROMODEM_HARDWARE_CONTROL_REPLY)
350  {
352  message.GetExtension(micromodem::protobuf::hw_ctl);
353 
354  if (message.src() == pending_hw_ctl_->hw_ctl_dest() &&
355  message.dest() == in_queue->modem_id())
356  {
357  glog.is(VERBOSE) && glog << "Received hardware control response: " << control
358  << " to our command: " << *pending_hw_ctl_ << std::endl;
359 
360  generate_hw_ctl_network_ack(in_queue, goby::acomms::protobuf::NetworkAck::ACK);
361  pending_hw_ctl_.reset();
362  }
363  }
364  }
365  catch (std::exception& e)
366  {
367  glog.is(WARN) && glog << "Failed to handle incoming message: " << e.what() << std::endl;
368  }
369 }
370 
371 void goby::acomms::Bridge::generate_hw_ctl_network_ack(
372  QueueManager* in_queue, goby::acomms::protobuf::NetworkAck::AckType type)
373 {
375  ack.set_ack_src(pending_hw_ctl_->hw_ctl_dest());
376  ack.set_message_dccl_id(DCCLCodec::get()->id(pending_hw_ctl_->GetDescriptor()));
377 
378  ack.set_message_src(pending_hw_ctl_->command_src());
379  ack.set_message_dest(pending_hw_ctl_->command_dest());
380  ack.set_message_time(pending_hw_ctl_->time());
381  ack.set_ack_type(type);
382 
383  r_manager_.handle_in(in_queue->meta_from_msg(ack), ack, in_queue->modem_id());
384 }
385 
386 void goby::acomms::Bridge::generate_time_update_network_ack(
387  QueueManager* in_queue, goby::acomms::protobuf::NetworkAck::AckType type)
388 {
390  ack.set_ack_src(pending_time_update_->dest());
391  ack.set_message_dccl_id(
392  DCCLCodec::get()->id(goby::acomms::protobuf::TimeUpdateRequest::descriptor()));
393 
394  ack.set_message_src(pending_time_update_->request_src());
395  ack.set_message_dest(pending_time_update_->dest());
396  ack.set_message_time(time_update_request_time_);
397  ack.set_ack_type(type);
398 
399  r_manager_.handle_in(in_queue->meta_from_msg(ack), ack, in_queue->modem_id());
400 }
401 
402 void goby::acomms::Bridge::handle_initiate_transmission(const protobuf::ModemTransmission& m,
403  int subnet)
404 {
405  // see if we need to override with a time update ping
406  if (pending_time_update_ &&
407  (m.dest() == pending_time_update_->dest() || m.dest() == QUERY_DESTINATION_ID))
408  {
409  protobuf::ModemTransmission new_transmission = m;
410  if (!pending_time_update_->has_time_of_flight_microsec())
411  {
412  new_transmission.set_dest(pending_time_update_->dest());
413  new_transmission.set_type(goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC);
414  new_transmission.SetExtension(micromodem::protobuf::type,
415  micromodem::protobuf::MICROMODEM_TWO_WAY_PING);
416  }
417  else
418  {
419  // send it out!
420  new_transmission.set_type(goby::acomms::protobuf::ModemTransmission::DATA);
421  new_transmission.set_ack_requested(true);
422  new_transmission.set_dest(pending_time_update_->dest());
423 
424  pending_time_update_->set_time(goby::common::goby_time<uint64>());
425 
426  goby::acomms::DCCLCodec::get()->encode(new_transmission.add_frame(),
427  *pending_time_update_);
428  }
429  publish(new_transmission,
430  "Tx" + goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
431  }
432  // see if we need to override with a hardware control command
433  else if (pending_hw_ctl_ &&
434  (m.dest() == pending_hw_ctl_->hw_ctl_dest() || m.dest() == QUERY_DESTINATION_ID))
435  {
436  protobuf::ModemTransmission new_transmission = m;
437  new_transmission.set_dest(pending_hw_ctl_->hw_ctl_dest());
438  new_transmission.set_type(goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC);
439  new_transmission.SetExtension(micromodem::protobuf::type,
440  micromodem::protobuf::MICROMODEM_HARDWARE_CONTROL);
441  new_transmission.MutableExtension(micromodem::protobuf::hw_ctl)
442  ->CopyFrom(pending_hw_ctl_->control());
443  publish(new_transmission,
444  "Tx" + goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
445  }
446  else
447  {
448  publish(m, "Tx" + goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
449  }
450 }
451 
452 void goby::acomms::Bridge::handle_data_request(const protobuf::ModemTransmission& orig_msg,
453  int subnet)
454 {
455  protobuf::ModemTransmission msg = orig_msg;
456  q_managers_[subnet]->handle_modem_data_request(&msg);
457  publish(msg, "DataResponse" +
458  goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
459 }
460 
461 void goby::acomms::Bridge::handle_driver_status(const protobuf::ModemDriverStatus& m, int subnet)
462 {
463  glog.is(VERBOSE) && glog << "Forwarding modemdriver status message to topside: "
464  << m.ShortDebugString() << std::endl;
465  QueueManager* in_queue = q_managers_[subnet].get();
466 
467  r_manager_.handle_in(in_queue->meta_from_msg(m), m, in_queue->modem_id());
468 }
provides an API to the goby-acomms Queuing Library.
Definition: queue_manager.h:49
Base class provided for users to generate applications that participate in the Goby publish/subscribe...
Definition: application.h:49
uint64 goby_time< uint64 >()
Returns current UTC time as integer microseconds since 1970-01-01 00:00:00.
Definition: time.h:113
static DCCLCodec * get()
DCCLCodec is a singleton class; use this to get a pointer to the class.
Definition: dccl.h:124
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Definition: mac_manager.h:51
void connect(Signal *signal, Slot slot)
connect a signal to a slot (e.g. function pointer)
Definition: connect.h:36
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
int modem_id()
The current modem ID (MAC address) of this node.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
void bind(ModemDriverBase &driver, QueueManager &queue_manager)
binds the driver link-layer callbacks to the QueueManager
Definition: bind.h:43
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
const int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...