23 #include <boost/foreach.hpp> 25 #include "goby/common/logger.h" 26 #include "goby/common/time.h" 27 #include "goby/util/binary.h" 29 #include "goby/acomms/dccl.h" 30 #include "goby/common/logger.h" 31 #include "goby/util/dynamic_protobuf_manager.h" 33 #include "queue_constants.h" 34 #include "queue_manager.h" 40 int goby::acomms::QueueManager::count_ = 0;
47 glog_push_group_ =
"goby::acomms::queue::push::" + as<std::string>(count_);
48 glog_pop_group_ =
"goby::acomms::queue::pop::" + as<std::string>(count_);
49 glog_priority_group_ =
"goby::acomms::queue::priority::" + as<std::string>(count_);
50 glog_out_group_ =
"goby::acomms::queue::out::" + as<std::string>(count_);
51 glog_in_group_ =
"goby::acomms::queue::in::" + as<std::string>(count_);
61 assert(ack.GetDescriptor() ==
62 google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(
63 "goby.acomms.protobuf.NetworkAck"));
65 assert(ack.GetDescriptor() == goby::util::DynamicProtobufManager::new_protobuf_message(
66 "goby.acomms.protobuf.NetworkAck")
71 const google::protobuf::Descriptor* desc,
77 codec_->validate(desc);
79 catch (DCCLException& e)
81 throw(
QueueException(
"could not create queue for message: " + desc->full_name() +
82 " because it failed DCCL validation: " + e.what()));
86 unsigned dccl_id = codec_->id(desc);
87 if (queues_.count(dccl_id))
89 glog.is(DEBUG1) &&
glog << group(glog_push_group_)
90 <<
"Updating config for queue: " << desc->full_name()
91 <<
" with: " << queue_cfg.ShortDebugString() << std::endl;
93 queues_.find(dccl_id)->second->set_cfg(queue_cfg);
98 if (queues_.count(dccl_id))
100 std::stringstream ss;
101 ss <<
"Queue: duplicate message specified for DCCL ID: " << dccl_id;
106 std::pair<std::map<unsigned, boost::shared_ptr<Queue> >::iterator,
bool> new_q_pair =
107 queues_.insert(std::make_pair(
108 dccl_id, boost::shared_ptr<Queue>(
new Queue(desc,
this, queue_cfg))));
110 Queue& new_q = *((new_q_pair.first)->second);
114 glog.is(DEBUG1) &&
glog << group(glog_out_group_) <<
"Added new queue: \n" 115 << new_q << std::endl;
121 for (std::map<
unsigned, boost::shared_ptr<Queue> >::iterator it = queues_.begin(),
125 std::vector<boost::shared_ptr<google::protobuf::Message> > expired_msgs =
126 it->second->expire();
128 BOOST_FOREACH (boost::shared_ptr<google::protobuf::Message> expire, expired_msgs)
131 if (network_ack_src_ids_.count(meta_from_msg(*expire).src()))
132 create_network_ack(modem_id_, *expire, goby::acomms::protobuf::NetworkAck::EXPIRE);
145 const google::protobuf::Descriptor* desc = dccl_msg.GetDescriptor();
146 unsigned dccl_id = codec_->id(desc);
148 if (!queues_.count(dccl_id))
149 throw(
QueueException(
"No queue exists for message: " + desc->full_name() +
150 "; you must configure it first."));
153 boost::shared_ptr<google::protobuf::Message> new_dccl_msg(dccl_msg.New());
154 new_dccl_msg->CopyFrom(dccl_msg);
157 queues_.find(dccl_id)->second->push_message(new_dccl_msg, *meta);
159 queues_.find(dccl_id)->second->push_message(new_dccl_msg);
161 qsize(queues_[dccl_id].
get());
166 std::map<unsigned, boost::shared_ptr<Queue> >::iterator it = queues_.find(flush.dccl_id());
168 if (it != queues_.end())
171 glog.is(DEBUG1) &&
glog << group(glog_out_group_) << msg_string(it->second->descriptor())
172 <<
": flushed queue" << std::endl;
173 qsize(it->second.get());
177 glog.is(DEBUG1) &&
glog << group(glog_out_group_) << warn
178 <<
"Cannot find queue to flush: " << flush << std::endl;
184 *os <<
"= Begin QueueManager [[" << queues_.size() <<
" queues total]] =" << std::endl;
185 for (std::map<
unsigned, boost::shared_ptr<Queue> >::const_iterator it = queues_.begin(),
188 info(it->second->descriptor(), os);
189 *os <<
"= End QueueManager =";
193 std::ostream* os)
const 195 std::map<unsigned, boost::shared_ptr<Queue> >::const_iterator it =
196 queues_.find(codec_->id(desc));
198 if (it != queues_.end())
199 it->second->info(os);
201 *os <<
"No such queue [[" << desc->full_name() <<
"]] loaded" 204 codec_->info(desc, os);
225 for (
unsigned frame_number = msg->frame_start(),
226 total_frames = msg->max_num_frames() + msg->frame_start();
227 frame_number < total_frames; ++frame_number)
229 std::string* data = 0;
230 if ((frame_number - msg->frame_start()) < (
unsigned)msg->frame_size())
231 data = msg->mutable_frame(frame_number - msg->frame_start());
233 data = msg->add_frame();
234 unsigned original_data_size = data->size();
236 glog.is(DEBUG2) &&
glog << group(glog_priority_group_) <<
"Finding next sender: " << *msg
240 Queue* winning_queue = find_next_sender(*msg, *data,
true);
245 msg->set_dest(packet_dest_);
246 glog.is(DEBUG1) &&
glog << group(glog_out_group_)
247 <<
"No data found. sending empty message to modem driver." 252 std::list<QueuedMessage> dccl_msgs;
255 bool using_encrypted_body =
false;
256 std::string passthrough_message;
258 while (winning_queue)
261 QueuedMessage next_user_frame = winning_queue->give_data(frame_number);
263 if (next_user_frame.meta.has_encoded_message())
265 using_encrypted_body =
true;
266 passthrough_message = next_user_frame.meta.encoded_message();
269 glog.is(DEBUG1) &&
glog << group(glog_out_group_)
270 << msg_string(winning_queue->descriptor())
271 <<
": sending data to modem driver (destination: " 272 << next_user_frame.meta.dest() <<
")" << std::endl;
274 if (manip_manager_.has(codec_->id(winning_queue->descriptor()),
275 protobuf::LOOPBACK_AS_SENT))
278 glog << group(glog_out_group_) << winning_queue->descriptor()->full_name()
279 <<
": LOOPBACK_AS_SENT manipulator set, sending back to decoder" 288 if (next_user_frame.meta.ack_requested())
290 glog.is(DEBUG2) &&
glog << group(glog_out_group_)
291 <<
"inserting ack for queue: " << *winning_queue
292 <<
" for frame: " << frame_number << std::endl;
293 waiting_for_ack_.insert(
294 std::pair<unsigned, Queue*>(frame_number, winning_queue));
298 glog.is(DEBUG2) &&
glog << group(glog_out_group_)
299 <<
"no ack, popping from queue: " << *winning_queue
301 if (!winning_queue->pop_message(frame_number))
302 glog.is(DEBUG1) &&
glog << group(glog_out_group_)
303 <<
"failed to pop from queue: " << *winning_queue
306 qsize(winning_queue);
310 if (packet_ack_ ==
false)
311 packet_ack_ = next_user_frame.meta.ack_requested();
317 if (frame_number == msg->frame_start())
321 msg->set_dest(next_user_frame.meta.dest());
323 if (msg->src() == QUERY_SOURCE_ID)
324 msg->set_src(modem_id_);
327 packet_dest_ = msg->dest();
342 next_user_frame.meta = meta_from_msg(*next_user_frame.dccl_msg);
343 dccl_msgs.push_back(next_user_frame);
346 if (using_encrypted_body)
353 unsigned repeated_size_bytes = size_repeated(dccl_msgs);
355 glog.is(DEBUG2) &&
glog << group(glog_out_group_) <<
"Size repeated " 356 << repeated_size_bytes << std::endl;
357 data->resize(repeated_size_bytes + original_data_size);
360 if ((msg->max_frame_bytes() - data->size()) > 0)
363 winning_queue = find_next_sender(*msg, *data,
false);
375 if (using_encrypted_body)
378 glog << group(glog_out_group_)
379 <<
"Encoding head only, passing through (encrypted?) body." 382 *data = data->substr(0, original_data_size);
384 if (dccl_msgs.size() > 1)
386 std::list<QueuedMessage>::iterator it_back = dccl_msgs.end();
389 encode_repeated(std::list<QueuedMessage>(dccl_msgs.begin(), it_back));
393 codec_->encode(&head, *dccl_msgs.back().dccl_msg,
true);
394 *data += head + passthrough_message.substr(head.size());
398 *data = data->substr(0, original_data_size) + encode_repeated(dccl_msgs);
401 catch (DCCLException& e)
404 glog.is(DEBUG1) &&
glog << group(glog_out_group_) << warn
405 <<
"Failed to encode, discarding message:" << e.what()
411 msg->set_ack_requested(packet_ack_);
414 std::string goby::acomms::QueueManager::encode_repeated(
const std::list<QueuedMessage>& msgs)
419 if (encrypt_rules_.size())
422 std::map<ModemId, std::string>::const_iterator it =
423 encrypt_rules_.find(msg.meta.dest());
425 if (it != encrypt_rules_.end())
427 cfg.set_crypto_passphrase(it->second);
430 codec_->merge_cfg(cfg);
434 codec_->encode(&piece, *(msg.dccl_msg));
440 std::list<goby::acomms::QueuedMessage>
441 goby::acomms::QueueManager::decode_repeated(
const std::string& orig_bytes)
443 std::string bytes = orig_bytes;
444 std::list<QueuedMessage> out;
445 while (!bytes.empty())
451 if (encrypt_rules_.size())
453 boost::shared_ptr<google::protobuf::Message> header =
454 codec_->decode<boost::shared_ptr<google::protobuf::Message> >(bytes,
true);
456 msg.meta = meta_from_msg(*header);
459 std::map<ModemId, std::string>::const_iterator it =
460 encrypt_rules_.find(msg.meta.src());
462 if (it != encrypt_rules_.end())
464 cfg.set_crypto_passphrase(it->second);
467 codec_->merge_cfg(cfg);
470 msg.dccl_msg = codec_->decode<boost::shared_ptr<google::protobuf::Message> >(bytes);
472 if (!encrypt_rules_.size())
473 msg.meta = meta_from_msg(*(msg.dccl_msg));
476 unsigned last_size = codec_->size(*(out.back().dccl_msg));
477 glog.is(common::logger::DEBUG1) &&
glog << group(glog_in_group_)
478 <<
"last message size was: " << last_size
480 bytes.erase(0, last_size);
482 catch (dccl::Exception& e)
488 glog.is(common::logger::WARN) &&
489 glog << group(glog_in_group_) <<
"failed to decode " 490 << goby::util::hex_encode(bytes) <<
" but returning parts already decoded" 499 unsigned goby::acomms::QueueManager::size_repeated(
const std::list<QueuedMessage>& msgs)
503 out += codec_->size(*(msg.dccl_msg));
509 for (std::multimap<unsigned, Queue*>::iterator it = waiting_for_ack_.begin(),
510 end = waiting_for_ack_.end();
513 if (it->second->clear_ack_queue(message.frame_start()))
514 waiting_for_ack_.erase(it++);
519 packet_ack_ = message.has_ack_requested() ? message.ack_requested() :
false;
520 packet_dest_ = message.dest();
525 const std::string& data,
bool first_user_frame)
528 double winning_priority;
529 boost::posix_time::ptime winning_last_send_time;
531 Queue* winning_queue = 0;
533 glog.is(DEBUG1) &&
glog << group(glog_priority_group_) <<
"Starting priority contest\n" 534 <<
"\tRequesting " << request_msg.max_num_frames() <<
" frame(s), have " 535 << data.size() <<
"/" << request_msg.max_frame_bytes() <<
"B" 538 for (std::map<
unsigned, boost::shared_ptr<Queue> >::iterator it = queues_.begin(),
542 Queue& q = *(it->second);
545 if (manip_manager_.has(codec_->id(q.descriptor()), protobuf::ON_DEMAND) &&
546 (!q.size() || q.newest_msg_time() + boost::posix_time::microseconds(
547 cfg_.on_demand_skew_seconds() * 1e6) <
550 boost::shared_ptr<google::protobuf::Message> new_msg =
551 goby::util::DynamicProtobufManager::new_protobuf_message(q.descriptor());
554 if (new_msg->IsInitialized())
559 boost::posix_time::ptime last_send_time;
560 if (q.get_priority_values(&priority, &last_send_time, request_msg, data))
563 if ((!winning_queue || priority > winning_priority ||
564 (priority == winning_priority && last_send_time < winning_last_send_time)))
566 winning_priority = priority;
567 winning_last_send_time = last_send_time;
573 glog.is(DEBUG1) &&
glog << group(glog_priority_group_) <<
"\t" 574 <<
"all other queues have no messages" << std::endl;
578 glog.is(DEBUG1) &&
glog << group(glog_priority_group_) << winning_queue->name()
579 <<
" has highest priority." << std::endl;
583 glog.is(DEBUG1) &&
glog << group(glog_priority_group_) <<
"ending priority contest" 587 return winning_queue;
592 for (
int i = 0, n = ack_msg.acked_frame_size(); i < n; ++i)
594 int frame_number = ack_msg.acked_frame(i);
595 if (ack_msg.dest() != modem_id_)
597 glog.is(WARN) &&
glog << group(glog_in_group_)
598 <<
"ignoring ack for modem_id = " << ack_msg.dest() << std::endl;
601 else if (!waiting_for_ack_.count(frame_number))
603 glog.is(DEBUG1) &&
glog << group(glog_in_group_)
604 <<
"got ack but we were not expecting one from " 605 << ack_msg.src() <<
" for frame " << frame_number << std::endl;
611 glog.is(DEBUG1) &&
glog << group(glog_in_group_) <<
"received ack for us from " 612 << ack_msg.src() <<
" for frame " << frame_number << std::endl;
614 std::multimap<unsigned, Queue*>::iterator it = waiting_for_ack_.find(frame_number);
615 while (it != waiting_for_ack_.end())
617 Queue* q = it->second;
619 boost::shared_ptr<google::protobuf::Message> removed_msg;
620 if (!q->pop_message_ack(frame_number, removed_msg))
622 glog.is(DEBUG1) &&
glog << group(glog_in_group_) << warn
623 <<
"failed to pop message from " << q->name()
630 if (network_ack_src_ids_.count(meta_from_msg(*removed_msg).src()))
631 create_network_ack(ack_msg.src(), *removed_msg,
632 goby::acomms::protobuf::NetworkAck::ACK);
635 glog.is(DEBUG2) &&
glog << group(glog_in_group_) << ack_msg << std::endl;
637 waiting_for_ack_.erase(it);
639 it = waiting_for_ack_.find(frame_number);
651 glog.is(DEBUG2) &&
glog << group(glog_in_group_) <<
"Received message" 652 <<
": " << modem_message << std::endl;
654 if (modem_message.type() == protobuf::ModemTransmission::ACK)
656 process_modem_ack(modem_message);
660 for (
int frame_number = 0, total_frames = modem_message.frame_size();
661 frame_number < total_frames; ++frame_number)
665 glog.is(DEBUG1) &&
glog << group(glog_in_group_) <<
"Received DATA message from " 666 << modem_message.src() << std::endl;
668 std::list<QueuedMessage> dccl_msgs;
670 if (!cfg_.skip_decoding())
672 dccl_msgs = decode_repeated(modem_message.frame(frame_number));
675 BOOST_FOREACH (
const QueuedMessage& decoded_message, dccl_msgs)
679 int32 dest = meta_msg.dest();
681 const google::protobuf::Descriptor* desc =
682 decoded_message.dccl_msg->GetDescriptor();
685 !manip_manager_.has(codec_->id(desc), protobuf::PROMISCUOUS))
687 glog.is(DEBUG1) &&
glog << group(glog_in_group_)
688 <<
"ignoring DCCL message for modem_id = " << dest
691 else if (dest ==
BROADCAST_ID && meta_msg.src() == modem_id_)
693 glog.is(DEBUG1) &&
glog << group(glog_in_group_)
694 <<
"ignoring broadcast message that we sent" 697 else if (manip_manager_.has(codec_->id(desc), protobuf::NO_DEQUEUE))
699 glog.is(DEBUG1) &&
glog << group(glog_in_group_)
700 <<
"ignoring message: " << desc->full_name()
701 <<
" because NO_DEQUEUE manipulator set" 710 catch (DCCLException& e)
712 glog.is(DEBUG1) &&
glog << group(glog_in_group_) << warn <<
"failed to decode " 713 << e.what() << std::endl;
721 boost::shared_ptr<google::protobuf::Message> decoded_message =
722 codec_->decode<boost::shared_ptr<google::protobuf::Message> >(
723 modem_message.frame(frame_number),
true);
726 if (modem_message.dest() == modem_id_ ||
727 (route_additional_modem_ids_.count(modem_message.dest())))
729 meta_msg.set_encoded_message(modem_message.frame(frame_number));
730 meta_msg.set_non_repeated_size(meta_msg.encoded_message().size());
735 catch (DCCLException& e)
737 glog.is(DEBUG1) &&
glog << group(glog_in_group_) << warn
738 <<
"failed to decode header for routing: " << e.what()
757 void goby::acomms::QueueManager::process_cfg()
759 modem_id_ = cfg_.modem_id();
760 manip_manager_.clear();
761 network_ack_src_ids_.clear();
762 route_additional_modem_ids_.clear();
763 encrypt_rules_.clear();
765 for (
int i = 0, n = cfg_.message_entry_size(); i < n; ++i)
767 const google::protobuf::Descriptor* desc =
768 goby::util::DynamicProtobufManager::find_descriptor(
769 cfg_.message_entry(i).protobuf_name());
774 for (
int j = 0, m = cfg_.message_entry(i).manipulator_size(); j < m; ++j)
775 { manip_manager_.add(codec_->id(desc), cfg_.message_entry(i).manipulator(j)); } }
779 glog << group(glog_push_group_) << warn
780 <<
"No message by the name: " << cfg_.message_entry(i).protobuf_name()
781 <<
" found, not setting Queue options for this type." << std::endl;
785 for (
int i = 0, n = cfg_.make_network_ack_for_src_id_size(); i < n; ++i)
788 glog << group(glog_push_group_)
789 <<
"Generating NetworkAck for messages required ACK from source ID: " 790 << cfg_.make_network_ack_for_src_id(i) << std::endl;
792 network_ack_src_ids_.insert(cfg_.make_network_ack_for_src_id(i));
795 for (
int i = 0, n = cfg_.route_for_additional_modem_id_size(); i < n; ++i)
797 glog.is(DEBUG1) &&
glog << group(glog_push_group_)
798 <<
"Also routing messages addressed to link layer destination ID: " 799 << cfg_.route_for_additional_modem_id(i) << std::endl;
801 route_additional_modem_ids_.insert(cfg_.route_for_additional_modem_id(i));
804 for (
int i = 0, n = cfg_.encrypt_rule_size(); i < n; ++i)
806 glog.is(DEBUG1) &&
glog << group(glog_push_group_)
807 <<
"Adding encrypt rule for ModemId: " << cfg_.encrypt_rule(i).id()
810 encrypt_rules_[cfg_.encrypt_rule(i).id()] = cfg_.encrypt_rule(i).crypto_passphrase();
814 void goby::acomms::QueueManager::qsize(
Queue* q)
817 size.set_dccl_id(codec_->id(q->descriptor()));
818 size.set_size(q->size());
822 void goby::acomms::QueueManager::create_network_ack(
824 goby::acomms::protobuf::NetworkAck::AckType ack_type)
826 if (orig_msg.GetDescriptor()->full_name() ==
"goby.acomms.protobuf.NetworkAck")
828 glog.is(DEBUG1) &&
glog << group(glog_in_group_)
829 <<
"Not generating network ack from NetworkAck to avoid infinite " 830 "proliferation of ACKS." 835 ack.set_ack_src(ack_src);
836 ack.set_message_dccl_id(
DCCLCodec::get()->
id(orig_msg.GetDescriptor()));
840 if (!network_ack_src_ids_.count(meta.src()))
842 glog.is(DEBUG1) &&
glog << group(glog_in_group_)
843 <<
"Not generating network ack for message from source ID: " 844 << meta.src() <<
" as we weren't asked to do so." << std::endl;
848 ack.set_message_src(meta.src());
849 ack.set_message_dest(meta.dest());
850 ack.set_message_time(meta.time());
851 ack.set_ack_type(ack_type);
853 glog.is(VERBOSE) &&
glog << group(glog_in_group_)
854 <<
"Generated network ack: " << ack.DebugString()
855 <<
"from: " << orig_msg.DebugString() << std::endl;
859 if (network_ack_meta.dest() == modem_id_)
provides an API to the goby-acomms Queuing Library.
boost::signals2::signal< void(const protobuf::ModemTransmission &ack_msg, const google::protobuf::Message &orig_msg)> signal_ack
Signals when acknowledgment of proper message receipt has been received. This is only sent for queues...
static DCCLCodec * get()
DCCLCodec is a singleton class; use this to get a pointer to the class.
boost::signals2::signal< void(const protobuf::ModemTransmission &request_msg, google::protobuf::Message *data_msg)> signal_data_on_demand
Forwards the data request to the application layer. This advanced feature is used when queue...
boost::signals2::signal< void(const google::protobuf::Message &msg)> signal_receive
Signals when a DCCL message is received.
Exception class for libdccl.
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
boost::signals2::signal< void(const protobuf::QueuedMessageMeta &meta, const google::protobuf::Message &data_msg, int modem_id)> signal_in_route
Used by a router to intercept messages and requeue them if desired.
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
const int BROADCAST_ID
special modem id for the broadcast destination - no one is assigned this address. Analogous to 192...
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages...
std::ostream & operator<<(std::ostream &out, const QueueManager &d)
outputs information about all available messages (same as info_all)
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
boost::signals2::signal< void(const google::protobuf::Message &orig_msg)> signal_expire
Signals when a message is expires (exceeds its time-to-live or ttl) before being sent (if queue...
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
boost::signals2::signal< void(protobuf::QueueSize size)> signal_queue_size_change
Signals when any queue changes size (message is popped or pushed)
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
QueueManager()
constructor
google::protobuf::int32 int32
a signed 32 bit integer
const int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...