Goby v2
queue_manager.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #include <boost/foreach.hpp>
24 
25 #include "goby/common/logger.h"
26 #include "goby/common/time.h"
27 #include "goby/util/binary.h"
28 
29 #include "goby/acomms/dccl.h"
30 #include "goby/common/logger.h"
31 #include "goby/util/dynamic_protobuf_manager.h"
32 
33 #include "queue_constants.h"
34 #include "queue_manager.h"
35 
36 using goby::glog;
37 using goby::util::as;
38 using namespace goby::common::logger;
39 
40 int goby::acomms::QueueManager::count_ = 0;
41 
43  : packet_ack_(0), packet_dest_(BROADCAST_ID), codec_(goby::acomms::DCCLCodec::get())
44 {
45  ++count_;
46 
47  glog_push_group_ = "goby::acomms::queue::push::" + as<std::string>(count_);
48  glog_pop_group_ = "goby::acomms::queue::pop::" + as<std::string>(count_);
49  glog_priority_group_ = "goby::acomms::queue::priority::" + as<std::string>(count_);
50  glog_out_group_ = "goby::acomms::queue::out::" + as<std::string>(count_);
51  glog_in_group_ = "goby::acomms::queue::in::" + as<std::string>(count_);
52 
53  goby::glog.add_group(glog_push_group_, common::Colors::lt_cyan);
54  goby::glog.add_group(glog_pop_group_, common::Colors::lt_green);
55  goby::glog.add_group(glog_priority_group_, common::Colors::yellow);
56  goby::glog.add_group(glog_out_group_, common::Colors::cyan);
57  goby::glog.add_group(glog_in_group_, common::Colors::green);
58 
60 
61  assert(ack.GetDescriptor() ==
62  google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(
63  "goby.acomms.protobuf.NetworkAck"));
64 
65  assert(ack.GetDescriptor() == goby::util::DynamicProtobufManager::new_protobuf_message(
66  "goby.acomms.protobuf.NetworkAck")
67  ->GetDescriptor());
68 }
69 
71  const google::protobuf::Descriptor* desc,
72  const protobuf::QueuedMessageEntry& queue_cfg /*= protobuf::QueuedMessageEntry()*/)
73 {
74  try
75  {
76  //validate with DCCL first
77  codec_->validate(desc);
78  }
79  catch (DCCLException& e)
80  {
81  throw(QueueException("could not create queue for message: " + desc->full_name() +
82  " because it failed DCCL validation: " + e.what()));
83  }
84 
85  // does the queue exist?
86  unsigned dccl_id = codec_->id(desc);
87  if (queues_.count(dccl_id))
88  {
89  glog.is(DEBUG1) && glog << group(glog_push_group_)
90  << "Updating config for queue: " << desc->full_name()
91  << " with: " << queue_cfg.ShortDebugString() << std::endl;
92 
93  queues_.find(dccl_id)->second->set_cfg(queue_cfg);
94  return;
95  }
96 
97  // add the newly generated queue
98  if (queues_.count(dccl_id))
99  {
100  std::stringstream ss;
101  ss << "Queue: duplicate message specified for DCCL ID: " << dccl_id;
102  throw QueueException(ss.str());
103  }
104  else
105  {
106  std::pair<std::map<unsigned, boost::shared_ptr<Queue> >::iterator, bool> new_q_pair =
107  queues_.insert(std::make_pair(
108  dccl_id, boost::shared_ptr<Queue>(new Queue(desc, this, queue_cfg))));
109 
110  Queue& new_q = *((new_q_pair.first)->second);
111 
112  qsize(&new_q);
113 
114  glog.is(DEBUG1) && glog << group(glog_out_group_) << "Added new queue: \n"
115  << new_q << std::endl;
116  }
117 }
118 
120 {
121  for (std::map<unsigned, boost::shared_ptr<Queue> >::iterator it = queues_.begin(),
122  n = queues_.end();
123  it != n; ++it)
124  {
125  std::vector<boost::shared_ptr<google::protobuf::Message> > expired_msgs =
126  it->second->expire();
127 
128  BOOST_FOREACH (boost::shared_ptr<google::protobuf::Message> expire, expired_msgs)
129  {
130  signal_expire(*expire);
131  if (network_ack_src_ids_.count(meta_from_msg(*expire).src()))
132  create_network_ack(modem_id_, *expire, goby::acomms::protobuf::NetworkAck::EXPIRE);
133  }
134  }
135 }
136 
138 {
139  push_message(dccl_msg, 0);
140 }
141 
143  const protobuf::QueuedMessageMeta* meta)
144 {
145  const google::protobuf::Descriptor* desc = dccl_msg.GetDescriptor();
146  unsigned dccl_id = codec_->id(desc);
147 
148  if (!queues_.count(dccl_id))
149  throw(QueueException("No queue exists for message: " + desc->full_name() +
150  "; you must configure it first."));
151 
152  // add the message
153  boost::shared_ptr<google::protobuf::Message> new_dccl_msg(dccl_msg.New());
154  new_dccl_msg->CopyFrom(dccl_msg);
155 
156  if (meta)
157  queues_.find(dccl_id)->second->push_message(new_dccl_msg, *meta);
158  else
159  queues_.find(dccl_id)->second->push_message(new_dccl_msg);
160 
161  qsize(queues_[dccl_id].get());
162 }
163 
165 {
166  std::map<unsigned, boost::shared_ptr<Queue> >::iterator it = queues_.find(flush.dccl_id());
167 
168  if (it != queues_.end())
169  {
170  it->second->flush();
171  glog.is(DEBUG1) && glog << group(glog_out_group_) << msg_string(it->second->descriptor())
172  << ": flushed queue" << std::endl;
173  qsize(it->second.get());
174  }
175  else
176  {
177  glog.is(DEBUG1) && glog << group(glog_out_group_) << warn
178  << "Cannot find queue to flush: " << flush << std::endl;
179  }
180 }
181 
182 void goby::acomms::QueueManager::info_all(std::ostream* os) const
183 {
184  *os << "= Begin QueueManager [[" << queues_.size() << " queues total]] =" << std::endl;
185  for (std::map<unsigned, boost::shared_ptr<Queue> >::const_iterator it = queues_.begin(),
186  n = queues_.end();
187  it != n; ++it)
188  info(it->second->descriptor(), os);
189  *os << "= End QueueManager =";
190 }
191 
192 void goby::acomms::QueueManager::info(const google::protobuf::Descriptor* desc,
193  std::ostream* os) const
194 {
195  std::map<unsigned, boost::shared_ptr<Queue> >::const_iterator it =
196  queues_.find(codec_->id(desc));
197 
198  if (it != queues_.end())
199  it->second->info(os);
200  else
201  *os << "No such queue [[" << desc->full_name() << "]] loaded"
202  << "\n";
203 
204  codec_->info(desc, os);
205 }
206 
207 std::ostream& goby::acomms::operator<<(std::ostream& out, const QueueManager& d)
208 {
209  d.info_all(&out);
210  return out;
211 }
212 
213 // finds and publishes outgoing data for the modem driver
214 // first query every Queue for its priority data using
215 // priority_values(priority, last_send_time)
216 // priority_values returns false if that object has no data to give
217 // (either no data at all, or in blackout interval)
218 // thus, from all the priority values that return true, pick the one with the lowest
219 // priority value, or given a tie, pick the one with the oldest last_send_time
221 {
222  // clear old waiting acknowledgments and reset packet defaults
223  clear_packet(*msg);
224 
225  for (unsigned frame_number = msg->frame_start(),
226  total_frames = msg->max_num_frames() + msg->frame_start();
227  frame_number < total_frames; ++frame_number)
228  {
229  std::string* data = 0;
230  if ((frame_number - msg->frame_start()) < (unsigned)msg->frame_size())
231  data = msg->mutable_frame(frame_number - msg->frame_start());
232  else
233  data = msg->add_frame();
234  unsigned original_data_size = data->size();
235 
236  glog.is(DEBUG2) && glog << group(glog_priority_group_) << "Finding next sender: " << *msg
237  << std::flush;
238 
239  // first (0th) user-frame
240  Queue* winning_queue = find_next_sender(*msg, *data, true);
241 
242  // no data at all for this frame ... :(
243  if (!winning_queue)
244  {
245  msg->set_dest(packet_dest_);
246  glog.is(DEBUG1) && glog << group(glog_out_group_)
247  << "No data found. sending empty message to modem driver."
248  << std::endl;
249  }
250  else
251  {
252  std::list<QueuedMessage> dccl_msgs;
253 
254  // set true if we are passing on encrypted data untouched
255  bool using_encrypted_body = false;
256  std::string passthrough_message;
257 
258  while (winning_queue)
259  {
260  // new user frame (e.g. 32B)
261  QueuedMessage next_user_frame = winning_queue->give_data(frame_number);
262 
263  if (next_user_frame.meta.has_encoded_message())
264  {
265  using_encrypted_body = true;
266  passthrough_message = next_user_frame.meta.encoded_message();
267  }
268 
269  glog.is(DEBUG1) && glog << group(glog_out_group_)
270  << msg_string(winning_queue->descriptor())
271  << ": sending data to modem driver (destination: "
272  << next_user_frame.meta.dest() << ")" << std::endl;
273 
274  if (manip_manager_.has(codec_->id(winning_queue->descriptor()),
275  protobuf::LOOPBACK_AS_SENT))
276  {
277  glog.is(DEBUG1) &&
278  glog << group(glog_out_group_) << winning_queue->descriptor()->full_name()
279  << ": LOOPBACK_AS_SENT manipulator set, sending back to decoder"
280  << std::endl;
281  signal_receive(*next_user_frame.dccl_msg);
282  }
283 
284  //
285  // ACK
286  //
287  // insert ack if desired
288  if (next_user_frame.meta.ack_requested())
289  {
290  glog.is(DEBUG2) && glog << group(glog_out_group_)
291  << "inserting ack for queue: " << *winning_queue
292  << " for frame: " << frame_number << std::endl;
293  waiting_for_ack_.insert(
294  std::pair<unsigned, Queue*>(frame_number, winning_queue));
295  }
296  else
297  {
298  glog.is(DEBUG2) && glog << group(glog_out_group_)
299  << "no ack, popping from queue: " << *winning_queue
300  << std::endl;
301  if (!winning_queue->pop_message(frame_number))
302  glog.is(DEBUG1) && glog << group(glog_out_group_)
303  << "failed to pop from queue: " << *winning_queue
304  << std::endl;
305 
306  qsize(winning_queue); // notify change in queue size
307  }
308 
309  // if an ack been set, do not unset these
310  if (packet_ack_ == false)
311  packet_ack_ = next_user_frame.meta.ack_requested();
312 
313  //
314  // DEST
315  //
316  // update destination address
317  if (frame_number == msg->frame_start())
318  {
319  // discipline the destination of the packet if initially unset
320  if (msg->dest() == QUERY_DESTINATION_ID)
321  msg->set_dest(next_user_frame.meta.dest());
322 
323  if (msg->src() == QUERY_SOURCE_ID)
324  msg->set_src(modem_id_);
325 
326  if (packet_dest_ == BROADCAST_ID || packet_dest_ == QUERY_DESTINATION_ID)
327  packet_dest_ = msg->dest();
328  }
329 
330  //
331  // DCCL
332  //
333  // // e.g. 32B
334  // std::string new_data = next_user_frame;
335 
336  // // insert the size of the next field (e.g. 32B) right after the header
337  // std::string frame_size(USER_FRAME_NEXT_SIZE_BYTES,
338  // static_cast<char>((next_user_frame.data().size()-DCCL_NUM_HEADER_BYTES)));
339  // new_data.insert(DCCL_NUM_HEADER_BYTES, frame_size);
340 
341  // fix the destination
342  next_user_frame.meta = meta_from_msg(*next_user_frame.dccl_msg);
343  dccl_msgs.push_back(next_user_frame);
344 
345  //
346  if (using_encrypted_body)
347  {
348  // can't pack multiple messages here
349  break;
350  }
351  else
352  {
353  unsigned repeated_size_bytes = size_repeated(dccl_msgs);
354 
355  glog.is(DEBUG2) && glog << group(glog_out_group_) << "Size repeated "
356  << repeated_size_bytes << std::endl;
357  data->resize(repeated_size_bytes + original_data_size);
358 
359  // if remaining bytes is greater than 0, we have a chance of adding another user-frame
360  if ((msg->max_frame_bytes() - data->size()) > 0)
361  {
362  // fetch the next candidate
363  winning_queue = find_next_sender(*msg, *data, false);
364  }
365  else
366  {
367  break;
368  }
369  }
370  }
371 
372  // finally actually encode the message
373  try
374  {
375  if (using_encrypted_body)
376  {
377  glog.is(DEBUG2) &&
378  glog << group(glog_out_group_)
379  << "Encoding head only, passing through (encrypted?) body."
380  << std::endl;
381 
382  *data = data->substr(0, original_data_size);
383  // encode all the messages but the last (these must be unencrypted)
384  if (dccl_msgs.size() > 1)
385  {
386  std::list<QueuedMessage>::iterator it_back = dccl_msgs.end();
387  --it_back;
388  *data +=
389  encode_repeated(std::list<QueuedMessage>(dccl_msgs.begin(), it_back));
390  }
391 
392  std::string head;
393  codec_->encode(&head, *dccl_msgs.back().dccl_msg, true);
394  *data += head + passthrough_message.substr(head.size());
395  }
396  else
397  {
398  *data = data->substr(0, original_data_size) + encode_repeated(dccl_msgs);
399  }
400  }
401  catch (DCCLException& e)
402  {
403  *data = "";
404  glog.is(DEBUG1) && glog << group(glog_out_group_) << warn
405  << "Failed to encode, discarding message:" << e.what()
406  << std::endl;
407  }
408  }
409  }
410  // only discipline the ACK value at the end, after all chances of making packet_ack_ = true are done
411  msg->set_ack_requested(packet_ack_);
412 }
413 
414 std::string goby::acomms::QueueManager::encode_repeated(const std::list<QueuedMessage>& msgs)
415 {
416  std::string out;
417  BOOST_FOREACH (const QueuedMessage& msg, msgs)
418  {
419  if (encrypt_rules_.size())
420  {
422  std::map<ModemId, std::string>::const_iterator it =
423  encrypt_rules_.find(msg.meta.dest());
424 
425  if (it != encrypt_rules_.end())
426  {
427  cfg.set_crypto_passphrase(it->second);
428  }
429 
430  codec_->merge_cfg(cfg);
431  }
432 
433  std::string piece;
434  codec_->encode(&piece, *(msg.dccl_msg));
435  out += piece;
436  }
437  return out;
438 }
439 
440 std::list<goby::acomms::QueuedMessage>
441 goby::acomms::QueueManager::decode_repeated(const std::string& orig_bytes)
442 {
443  std::string bytes = orig_bytes;
444  std::list<QueuedMessage> out;
445  while (!bytes.empty())
446  {
447  try
448  {
449  QueuedMessage msg;
450 
451  if (encrypt_rules_.size())
452  {
453  boost::shared_ptr<google::protobuf::Message> header =
454  codec_->decode<boost::shared_ptr<google::protobuf::Message> >(bytes, true);
455 
456  msg.meta = meta_from_msg(*header);
457 
459  std::map<ModemId, std::string>::const_iterator it =
460  encrypt_rules_.find(msg.meta.src());
461 
462  if (it != encrypt_rules_.end())
463  {
464  cfg.set_crypto_passphrase(it->second);
465  }
466 
467  codec_->merge_cfg(cfg);
468  }
469 
470  msg.dccl_msg = codec_->decode<boost::shared_ptr<google::protobuf::Message> >(bytes);
471 
472  if (!encrypt_rules_.size())
473  msg.meta = meta_from_msg(*(msg.dccl_msg));
474 
475  out.push_back(msg);
476  unsigned last_size = codec_->size(*(out.back().dccl_msg));
477  glog.is(common::logger::DEBUG1) && glog << group(glog_in_group_)
478  << "last message size was: " << last_size
479  << std::endl;
480  bytes.erase(0, last_size);
481  }
482  catch (dccl::Exception& e)
483  {
484  if (out.empty())
485  throw(e);
486  else
487  {
488  glog.is(common::logger::WARN) &&
489  glog << group(glog_in_group_) << "failed to decode "
490  << goby::util::hex_encode(bytes) << " but returning parts already decoded"
491  << std::endl;
492  return out;
493  }
494  }
495  }
496  return out;
497 }
498 
499 unsigned goby::acomms::QueueManager::size_repeated(const std::list<QueuedMessage>& msgs)
500 {
501  unsigned out = 0;
502  BOOST_FOREACH (const QueuedMessage& msg, msgs)
503  out += codec_->size(*(msg.dccl_msg));
504  return out;
505 }
506 
507 void goby::acomms::QueueManager::clear_packet(const protobuf::ModemTransmission& message)
508 {
509  for (std::multimap<unsigned, Queue*>::iterator it = waiting_for_ack_.begin(),
510  end = waiting_for_ack_.end();
511  it != end;)
512  {
513  if (it->second->clear_ack_queue(message.frame_start()))
514  waiting_for_ack_.erase(it++);
515  else
516  ++it;
517  }
518 
519  packet_ack_ = message.has_ack_requested() ? message.ack_requested() : false;
520  packet_dest_ = message.dest();
521 }
522 
524 goby::acomms::QueueManager::find_next_sender(const protobuf::ModemTransmission& request_msg,
525  const std::string& data, bool first_user_frame)
526 {
527  // competition between variable about who gets to send
528  double winning_priority;
529  boost::posix_time::ptime winning_last_send_time;
530 
531  Queue* winning_queue = 0;
532 
533  glog.is(DEBUG1) && glog << group(glog_priority_group_) << "Starting priority contest\n"
534  << "\tRequesting " << request_msg.max_num_frames() << " frame(s), have "
535  << data.size() << "/" << request_msg.max_frame_bytes() << "B"
536  << std::endl;
537 
538  for (std::map<unsigned, boost::shared_ptr<Queue> >::iterator it = queues_.begin(),
539  n = queues_.end();
540  it != n; ++it)
541  {
542  Queue& q = *(it->second);
543 
544  // encode on demand
545  if (manip_manager_.has(codec_->id(q.descriptor()), protobuf::ON_DEMAND) &&
546  (!q.size() || q.newest_msg_time() + boost::posix_time::microseconds(
547  cfg_.on_demand_skew_seconds() * 1e6) <
549  {
550  boost::shared_ptr<google::protobuf::Message> new_msg =
551  goby::util::DynamicProtobufManager::new_protobuf_message(q.descriptor());
552  signal_data_on_demand(request_msg, new_msg.get());
553 
554  if (new_msg->IsInitialized())
555  push_message(*new_msg);
556  }
557 
558  double priority;
559  boost::posix_time::ptime last_send_time;
560  if (q.get_priority_values(&priority, &last_send_time, request_msg, data))
561  {
562  // no winner, better winner, or equal & older winner
563  if ((!winning_queue || priority > winning_priority ||
564  (priority == winning_priority && last_send_time < winning_last_send_time)))
565  {
566  winning_priority = priority;
567  winning_last_send_time = last_send_time;
568  winning_queue = &q;
569  }
570  }
571  }
572 
573  glog.is(DEBUG1) && glog << group(glog_priority_group_) << "\t"
574  << "all other queues have no messages" << std::endl;
575 
576  if (winning_queue)
577  {
578  glog.is(DEBUG1) && glog << group(glog_priority_group_) << winning_queue->name()
579  << " has highest priority." << std::endl;
580  }
581  else
582  {
583  glog.is(DEBUG1) && glog << group(glog_priority_group_) << "ending priority contest"
584  << std::endl;
585  }
586 
587  return winning_queue;
588 }
589 
590 void goby::acomms::QueueManager::process_modem_ack(const protobuf::ModemTransmission& ack_msg)
591 {
592  for (int i = 0, n = ack_msg.acked_frame_size(); i < n; ++i)
593  {
594  int frame_number = ack_msg.acked_frame(i);
595  if (ack_msg.dest() != modem_id_)
596  {
597  glog.is(WARN) && glog << group(glog_in_group_)
598  << "ignoring ack for modem_id = " << ack_msg.dest() << std::endl;
599  continue;
600  }
601  else if (!waiting_for_ack_.count(frame_number))
602  {
603  glog.is(DEBUG1) && glog << group(glog_in_group_)
604  << "got ack but we were not expecting one from "
605  << ack_msg.src() << " for frame " << frame_number << std::endl;
606  continue;
607  }
608  else
609  {
610  // got an ack, let's pop this!
611  glog.is(DEBUG1) && glog << group(glog_in_group_) << "received ack for us from "
612  << ack_msg.src() << " for frame " << frame_number << std::endl;
613 
614  std::multimap<unsigned, Queue*>::iterator it = waiting_for_ack_.find(frame_number);
615  while (it != waiting_for_ack_.end())
616  {
617  Queue* q = it->second;
618 
619  boost::shared_ptr<google::protobuf::Message> removed_msg;
620  if (!q->pop_message_ack(frame_number, removed_msg))
621  {
622  glog.is(DEBUG1) && glog << group(glog_in_group_) << warn
623  << "failed to pop message from " << q->name()
624  << std::endl;
625  }
626  else
627  {
628  qsize(q);
629  signal_ack(ack_msg, *removed_msg);
630  if (network_ack_src_ids_.count(meta_from_msg(*removed_msg).src()))
631  create_network_ack(ack_msg.src(), *removed_msg,
632  goby::acomms::protobuf::NetworkAck::ACK);
633  }
634 
635  glog.is(DEBUG2) && glog << group(glog_in_group_) << ack_msg << std::endl;
636 
637  waiting_for_ack_.erase(it);
638 
639  it = waiting_for_ack_.find(frame_number);
640  }
641  }
642  }
643 }
644 
645 // parses and publishes incoming data
646 // by matching the variableID field with the variable specified
647 // in a "receive = " line of the configuration file
649  const protobuf::ModemTransmission& modem_message)
650 {
651  glog.is(DEBUG2) && glog << group(glog_in_group_) << "Received message"
652  << ": " << modem_message << std::endl;
653 
654  if (modem_message.type() == protobuf::ModemTransmission::ACK)
655  {
656  process_modem_ack(modem_message);
657  }
658  else
659  {
660  for (int frame_number = 0, total_frames = modem_message.frame_size();
661  frame_number < total_frames; ++frame_number)
662  {
663  try
664  {
665  glog.is(DEBUG1) && glog << group(glog_in_group_) << "Received DATA message from "
666  << modem_message.src() << std::endl;
667 
668  std::list<QueuedMessage> dccl_msgs;
669 
670  if (!cfg_.skip_decoding())
671  {
672  dccl_msgs = decode_repeated(modem_message.frame(frame_number));
673  }
674 
675  BOOST_FOREACH (const QueuedMessage& decoded_message, dccl_msgs)
676  {
677  const protobuf::QueuedMessageMeta& meta_msg = decoded_message.meta;
678 
679  int32 dest = meta_msg.dest();
680 
681  const google::protobuf::Descriptor* desc =
682  decoded_message.dccl_msg->GetDescriptor();
683 
684  if (dest != BROADCAST_ID && dest != modem_id_ &&
685  !manip_manager_.has(codec_->id(desc), protobuf::PROMISCUOUS))
686  {
687  glog.is(DEBUG1) && glog << group(glog_in_group_)
688  << "ignoring DCCL message for modem_id = " << dest
689  << std::endl;
690  }
691  else if (dest == BROADCAST_ID && meta_msg.src() == modem_id_)
692  {
693  glog.is(DEBUG1) && glog << group(glog_in_group_)
694  << "ignoring broadcast message that we sent"
695  << std::endl;
696  }
697  else if (manip_manager_.has(codec_->id(desc), protobuf::NO_DEQUEUE))
698  {
699  glog.is(DEBUG1) && glog << group(glog_in_group_)
700  << "ignoring message: " << desc->full_name()
701  << " because NO_DEQUEUE manipulator set"
702  << std::endl;
703  }
704  else
705  {
706  signal_receive(*(decoded_message.dccl_msg));
707  }
708  }
709  }
710  catch (DCCLException& e)
711  {
712  glog.is(DEBUG1) && glog << group(glog_in_group_) << warn << "failed to decode "
713  << e.what() << std::endl;
714  }
715 
716  try
717  {
718  if (!signal_in_route.empty())
719  {
720  // decode only header
721  boost::shared_ptr<google::protobuf::Message> decoded_message =
722  codec_->decode<boost::shared_ptr<google::protobuf::Message> >(
723  modem_message.frame(frame_number), true);
724  protobuf::QueuedMessageMeta meta_msg = meta_from_msg(*decoded_message);
725  // messages addressed to us on the link
726  if (modem_message.dest() == modem_id_ ||
727  (route_additional_modem_ids_.count(modem_message.dest())))
728  {
729  meta_msg.set_encoded_message(modem_message.frame(frame_number));
730  meta_msg.set_non_repeated_size(meta_msg.encoded_message().size());
731  signal_in_route(meta_msg, *decoded_message, modem_id_);
732  }
733  }
734  }
735  catch (DCCLException& e)
736  {
737  glog.is(DEBUG1) && glog << group(glog_in_group_) << warn
738  << "failed to decode header for routing: " << e.what()
739  << std::endl;
740  }
741  }
742  }
743 }
744 
746 {
747  cfg_ = cfg;
748  process_cfg();
749 }
750 
752 {
753  cfg_.MergeFrom(cfg);
754  process_cfg();
755 }
756 
757 void goby::acomms::QueueManager::process_cfg()
758 {
759  modem_id_ = cfg_.modem_id();
760  manip_manager_.clear();
761  network_ack_src_ids_.clear();
762  route_additional_modem_ids_.clear();
763  encrypt_rules_.clear();
764 
765  for (int i = 0, n = cfg_.message_entry_size(); i < n; ++i)
766  {
767  const google::protobuf::Descriptor* desc =
768  goby::util::DynamicProtobufManager::find_descriptor(
769  cfg_.message_entry(i).protobuf_name());
770  if (desc)
771  {
772  add_queue(desc, cfg_.message_entry(i));
773 
774  for (int j = 0, m = cfg_.message_entry(i).manipulator_size(); j < m; ++j)
775  { manip_manager_.add(codec_->id(desc), cfg_.message_entry(i).manipulator(j)); } }
776  else
777  {
778  glog.is(DEBUG1) &&
779  glog << group(glog_push_group_) << warn
780  << "No message by the name: " << cfg_.message_entry(i).protobuf_name()
781  << " found, not setting Queue options for this type." << std::endl;
782  }
783  }
784 
785  for (int i = 0, n = cfg_.make_network_ack_for_src_id_size(); i < n; ++i)
786  {
787  glog.is(DEBUG1) &&
788  glog << group(glog_push_group_)
789  << "Generating NetworkAck for messages required ACK from source ID: "
790  << cfg_.make_network_ack_for_src_id(i) << std::endl;
791 
792  network_ack_src_ids_.insert(cfg_.make_network_ack_for_src_id(i));
793  }
794 
795  for (int i = 0, n = cfg_.route_for_additional_modem_id_size(); i < n; ++i)
796  {
797  glog.is(DEBUG1) && glog << group(glog_push_group_)
798  << "Also routing messages addressed to link layer destination ID: "
799  << cfg_.route_for_additional_modem_id(i) << std::endl;
800 
801  route_additional_modem_ids_.insert(cfg_.route_for_additional_modem_id(i));
802  }
803 
804  for (int i = 0, n = cfg_.encrypt_rule_size(); i < n; ++i)
805  {
806  glog.is(DEBUG1) && glog << group(glog_push_group_)
807  << "Adding encrypt rule for ModemId: " << cfg_.encrypt_rule(i).id()
808  << std::endl;
809 
810  encrypt_rules_[cfg_.encrypt_rule(i).id()] = cfg_.encrypt_rule(i).crypto_passphrase();
811  }
812 }
813 
814 void goby::acomms::QueueManager::qsize(Queue* q)
815 {
816  protobuf::QueueSize size;
817  size.set_dccl_id(codec_->id(q->descriptor()));
818  size.set_size(q->size());
820 }
821 
822 void goby::acomms::QueueManager::create_network_ack(
823  int ack_src, const google::protobuf::Message& orig_msg,
824  goby::acomms::protobuf::NetworkAck::AckType ack_type)
825 {
826  if (orig_msg.GetDescriptor()->full_name() == "goby.acomms.protobuf.NetworkAck")
827  {
828  glog.is(DEBUG1) && glog << group(glog_in_group_)
829  << "Not generating network ack from NetworkAck to avoid infinite "
830  "proliferation of ACKS."
831  << std::endl;
832  return;
833  }
835  ack.set_ack_src(ack_src);
836  ack.set_message_dccl_id(DCCLCodec::get()->id(orig_msg.GetDescriptor()));
837 
838  protobuf::QueuedMessageMeta meta = meta_from_msg(orig_msg);
839 
840  if (!network_ack_src_ids_.count(meta.src()))
841  {
842  glog.is(DEBUG1) && glog << group(glog_in_group_)
843  << "Not generating network ack for message from source ID: "
844  << meta.src() << " as we weren't asked to do so." << std::endl;
845  return;
846  }
847 
848  ack.set_message_src(meta.src());
849  ack.set_message_dest(meta.dest());
850  ack.set_message_time(meta.time());
851  ack.set_ack_type(ack_type);
852 
853  glog.is(VERBOSE) && glog << group(glog_in_group_)
854  << "Generated network ack: " << ack.DebugString()
855  << "from: " << orig_msg.DebugString() << std::endl;
856 
857  const protobuf::QueuedMessageMeta network_ack_meta = meta_from_msg(ack);
858 
859  if (network_ack_meta.dest() == modem_id_)
860  signal_receive(ack);
861  else
862  signal_in_route(network_ack_meta, ack, modem_id_);
863 }
provides an API to the goby-acomms Queuing Library.
Definition: queue_manager.h:49
boost::signals2::signal< void(const protobuf::ModemTransmission &ack_msg, const google::protobuf::Message &orig_msg)> signal_ack
Signals when acknowledgment of proper message receipt has been received. This is only sent for queues...
static DCCLCodec * get()
DCCLCodec is a singleton class; use this to get a pointer to the class.
Definition: dccl.h:124
boost::signals2::signal< void(const protobuf::ModemTransmission &request_msg, google::protobuf::Message *data_msg)> signal_data_on_demand
Forwards the data request to the application layer. This advanced feature is used when queue...
boost::signals2::signal< void(const google::protobuf::Message &msg)> signal_receive
Signals when a DCCL message is received.
Exception class for libdccl.
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
Definition: queue_manager.h:72
boost::signals2::signal< void(const protobuf::QueuedMessageMeta &meta, const google::protobuf::Message &data_msg, int modem_id)> signal_in_route
Used by a router to intercept messages and requeue them if desired.
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
Definition: time.h:104
const int BROADCAST_ID
special modem id for the broadcast destination - no one is assigned this address. Analogous to 192...
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages...
std::ostream & operator<<(std::ostream &out, const QueueManager &d)
outputs information about all available messages (same as info_all)
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
boost::signals2::signal< void(const google::protobuf::Message &orig_msg)> signal_expire
Signals when a message is expires (exceeds its time-to-live or ttl) before being sent (if queue...
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
boost::signals2::signal< void(protobuf::QueueSize size)> signal_queue_size_change
Signals when any queue changes size (message is popped or pushed)
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
google::protobuf::int32 int32
a signed 32 bit integer
const int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...