23 #include "goby/acomms/acomms_constants.h" 24 #include "goby/acomms/dccl.h" 25 #include "goby/common/logger.h" 26 #include "goby/util/dynamic_protobuf_manager.h" 29 #include "queue_manager.h" 36 goby::acomms::Queue::Queue(
const google::protobuf::Descriptor* desc, QueueManager* parent,
37 const protobuf::QueuedMessageEntry& cfg)
38 : desc_(desc), parent_(parent), cfg_(cfg), last_send_time_(
goby_time())
44 bool goby::acomms::Queue::push_message(boost::shared_ptr<google::protobuf::Message> dccl_msg)
46 protobuf::QueuedMessageMeta meta = meta_from_msg(*dccl_msg);
47 return push_message(dccl_msg, meta);
50 bool goby::acomms::Queue::push_message(boost::shared_ptr<google::protobuf::Message> dccl_msg,
51 protobuf::QueuedMessageMeta meta)
54 if (parent_->manip_manager_.has(
id(), protobuf::LOOPBACK) && !meta.has_encoded_message())
56 glog.is(DEBUG1) &&
glog << group(parent_->glog_push_group())
57 << parent_->msg_string(dccl_msg->GetDescriptor())
58 <<
": LOOPBACK manipulator set, sending back to decoder" 60 parent_->signal_receive(*dccl_msg);
63 parent_->signal_out_route(&meta, *dccl_msg, parent_->cfg_.modem_id());
66 glog << group(parent_->glog_push_group()) << parent_->msg_string(dccl_msg->GetDescriptor())
67 <<
": attempting to push message (destination: " << meta.dest() <<
")" << std::endl;
70 if (parent_->manip_manager_.has(
id(), protobuf::NO_QUEUE))
72 glog.is(DEBUG1) &&
glog << group(parent_->glog_push_group())
73 << parent_->msg_string(dccl_msg->GetDescriptor())
74 <<
": not queuing: NO_QUEUE manipulator is set" << std::endl;
78 else if (meta.dest() == parent_->modem_id_)
80 glog.is(DEBUG1) &&
glog << group(parent_->glog_push_group())
81 <<
"Message is for us: using loopback, not physical interface" 84 parent_->signal_receive(*dccl_msg);
87 if ((meta.has_ack_requested() && meta.ack_requested()) || queue_message_options().ack())
89 protobuf::ModemTransmission ack_msg;
91 ack_msg.set_src(meta.dest());
92 ack_msg.set_dest(meta.dest());
93 ack_msg.set_type(protobuf::ModemTransmission::ACK);
95 parent_->signal_ack(ack_msg, *dccl_msg);
100 if (!meta.has_time())
103 if (meta.non_repeated_size() == 0)
105 goby::glog.is(DEBUG1) &&
glog << group(parent_->glog_out_group()) << warn
106 <<
"empty message attempted to be pushed to queue " << name()
111 if (!meta.has_ack_requested())
112 meta.set_ack_requested(queue_message_options().ack());
113 messages_.push_back(QueuedMessage());
114 messages_.back().meta = meta;
115 messages_.back().dccl_msg = dccl_msg;
117 glog.is(DEBUG1) &&
glog << group(parent_->glog_push_group())
118 <<
"pushed to send stack (queue size " << size() <<
"/" 119 << queue_message_options().max_queue() <<
")" << std::endl;
121 glog.is(DEBUG2) &&
glog << group(parent_->glog_push_group()) <<
"Message: " << *dccl_msg
123 glog.is(DEBUG2) &&
glog << group(parent_->glog_push_group()) <<
"Meta: " << meta << std::endl;
126 if (queue_message_options().max_queue() &&
127 messages_.size() > queue_message_options().max_queue())
129 messages_it it_to_erase =
130 queue_message_options().newest_first() ? messages_.begin() : messages_.end();
133 if (it_to_erase == messages_.end())
137 waiting_for_ack_it it = find_ack_value(it_to_erase);
138 if (it != waiting_for_ack_.end())
139 waiting_for_ack_.erase(it);
141 glog.is(DEBUG1) &&
glog << group(parent_->glog_pop_group()) <<
"queue exceeded for " 142 << name() <<
". removing: " << it_to_erase->meta << std::endl;
144 messages_.erase(it_to_erase);
153 protobuf::QueuedMessageMeta meta = static_meta_;
154 meta.set_non_repeated_size(parent_->codec_->size(dccl_msg));
156 if (!roles_[protobuf::QueuedMessageEntry::DESTINATION_ID].empty())
158 boost::any field_value =
159 find_queue_field(roles_[protobuf::QueuedMessageEntry::DESTINATION_ID], dccl_msg);
162 if (field_value.type() ==
typeid(
int32))
163 dest = boost::any_cast<
int32>(field_value);
164 else if (field_value.type() ==
typeid(
int64))
165 dest = boost::any_cast<
int64>(field_value);
166 else if (field_value.type() ==
typeid(
uint32))
167 dest = boost::any_cast<
uint32>(field_value);
168 else if (field_value.type() ==
typeid(
uint64))
169 dest = boost::any_cast<
uint64>(field_value);
170 else if (!field_value.empty())
171 throw(QueueException(
"Invalid type " + std::string(field_value.type().name()) +
172 " given for (queue_field).is_dest. Expected integer type"));
175 <<
"setting dest to " << dest << std::endl;
180 if (!roles_[protobuf::QueuedMessageEntry::SOURCE_ID].empty())
182 boost::any field_value =
183 find_queue_field(roles_[protobuf::QueuedMessageEntry::SOURCE_ID], dccl_msg);
186 if (field_value.type() ==
typeid(
int32))
187 src = boost::any_cast<
int32>(field_value);
188 else if (field_value.type() ==
typeid(
int64))
189 src = boost::any_cast<
int64>(field_value);
190 else if (field_value.type() ==
typeid(
uint32))
191 src = boost::any_cast<
uint32>(field_value);
192 else if (field_value.type() ==
typeid(
uint64))
193 src = boost::any_cast<
uint64>(field_value);
194 else if (!field_value.empty())
195 throw(QueueException(
"Invalid type " + std::string(field_value.type().name()) +
196 " given for (queue_field).is_src. Expected integer type"));
199 <<
"setting source to " << src << std::endl;
204 if (!roles_[protobuf::QueuedMessageEntry::TIMESTAMP].empty())
206 boost::any field_value =
207 find_queue_field(roles_[protobuf::QueuedMessageEntry::TIMESTAMP], dccl_msg);
209 if (field_value.type() ==
typeid(
uint64))
210 meta.set_time(boost::any_cast<uint64>(field_value));
211 else if (field_value.type() ==
typeid(
double))
212 meta.set_time(static_cast<uint64>(boost::any_cast<double>(field_value)) * 1e6);
213 else if (field_value.type() ==
typeid(std::string))
214 meta.set_time(goby::util::as<uint64>(goby::util::as<boost::posix_time::ptime>(
215 boost::any_cast<std::string>(field_value))));
216 else if (!field_value.empty())
218 QueueException(
"Invalid type " + std::string(field_value.type().name()) +
219 " given for (goby.field).queue.is_time. Expected uint64 contained " 220 "microseconds since UNIX, double containing seconds since UNIX or " 221 "std::string containing as<std::string>(boost::posix_time::ptime)"));
224 << group(parent_->glog_push_group_) <<
"setting time to " 225 << as<boost::posix_time::ptime>(meta.time()) << std::endl;
228 glog.is(DEBUG2) &&
glog << group(parent_->glog_push_group()) <<
"Meta: " << meta << std::endl;
232 boost::any goby::acomms::Queue::find_queue_field(
const std::string& field_name,
236 const google::protobuf::Descriptor* current_desc = current_msg->GetDescriptor();
239 std::vector<std::string> field_names;
240 boost::split(field_names, field_name, boost::is_any_of(
"."));
242 for (
int i = 0, n = field_names.size(); i < n; ++i)
244 const google::protobuf::FieldDescriptor* field_desc =
245 current_desc->FindFieldByName(field_names[i]);
247 throw(QueueException(
"No such field called " + field_name +
" in msg " +
248 current_desc->full_name()));
250 if (field_desc->is_repeated())
251 throw(QueueException(
"Cannot assign a Queue role to a repeated field"));
253 boost::shared_ptr<FromProtoCppTypeBase> helper =
254 goby::acomms::DCCLTypeHelper::find(field_desc);
259 return helper->get_value(field_desc, *current_msg);
261 else if (field_desc->type() != google::protobuf::FieldDescriptor::TYPE_MESSAGE)
263 throw(QueueException(
"Cannot access child fields of a non-message field: " +
268 boost::any value = helper->get_value(field_desc, *current_msg);
274 current_desc = current_msg->GetDescriptor();
282 goby::acomms::messages_it goby::acomms::Queue::next_message_it()
284 messages_it it_to_give =
285 queue_message_options().newest_first() ? messages_.end() : messages_.begin();
286 if (it_to_give == messages_.end())
290 while (find_ack_value(it_to_give) != waiting_for_ack_.end())
291 queue_message_options().newest_first() ? --it_to_give : ++it_to_give;
298 messages_it it_to_give = next_message_it();
300 bool ack = it_to_give->meta.ack_requested();
302 if (it_to_give->meta.dest() ==
BROADCAST_ID && ack ==
true)
304 glog.is(DEBUG1) &&
glog << group(parent_->glog_pop_group()) << parent_->msg_string(desc_)
305 <<
": setting ack=false because BROADCAST (0) cannot ACK messages" 310 it_to_give->meta.set_ack_requested(ack);
313 waiting_for_ack_.insert(std::pair<unsigned, messages_it>(frame, it_to_give));
316 it_to_give->meta.set_last_sent_time(util::as<goby::uint64>(last_send_time_));
322 bool goby::acomms::Queue::get_priority_values(
double* priority,
323 boost::posix_time::ptime* last_send_time,
324 const protobuf::ModemTransmission& request_msg,
325 const std::string& data)
328 queue_message_options().ttl() * queue_message_options().value_base();
330 *last_send_time = last_send_time_;
333 if (messages_.size() <= waiting_for_ack_.size())
336 protobuf::QueuedMessageMeta& next_msg = next_message_it()->meta;
341 if (last_send_time_ + boost::posix_time::seconds(queue_message_options().blackout_time()) >
344 glog.is(DEBUG1) &&
glog << group(parent_->glog_priority_group()) <<
"\t" << name()
345 <<
" is in blackout" << std::endl;
349 else if (request_msg.has_max_frame_bytes() &&
350 (next_msg.non_repeated_size() > (request_msg.max_frame_bytes() - data.size())))
352 glog.is(DEBUG1) &&
glog << group(parent_->glog_priority_group()) <<
"\t" << name()
353 <<
" next message is too large {" << next_msg.non_repeated_size()
358 else if ((request_msg.has_dest() &&
361 || request_msg.dest() == next_msg.dest())))
363 glog.is(DEBUG1) &&
glog << group(parent_->glog_priority_group()) <<
"\t" << name()
364 <<
" next message has wrong destination (must be BROADCAST (0) or " 365 "same as first user-frame, is " 366 << next_msg.dest() <<
")" << std::endl;
370 else if ((request_msg.has_ack_requested() && !request_msg.ack_requested() &&
373 glog.is(DEBUG1) &&
glog << group(parent_->glog_priority_group()) <<
"\t" << name()
374 <<
" next message requires ACK and the packet does not" 380 glog.is(DEBUG1) &&
glog << group(parent_->glog_priority_group()) <<
"\t" << name() <<
" (" 381 << next_msg.non_repeated_size() <<
"B) has priority value" 382 <<
": " << *priority << std::endl;
387 bool goby::acomms::Queue::pop_message(
unsigned frame)
389 std::list<QueuedMessage>::iterator back_it = messages_.end();
391 std::list<QueuedMessage>::iterator front_it = messages_.begin();
394 std::list<QueuedMessage>::iterator it =
395 queue_message_options().newest_first() ? back_it : front_it;
399 if (!it->meta.ack_requested())
406 if (it == (queue_message_options().newest_first() ? front_it : back_it))
409 queue_message_options().newest_first() ? --it : ++it;
414 bool goby::acomms::Queue::pop_message_ack(
unsigned frame,
415 boost::shared_ptr<google::protobuf::Message>& removed_msg)
418 if (waiting_for_ack_.count(frame))
421 waiting_for_ack_it it = waiting_for_ack_.find(frame);
422 removed_msg = (it->second)->dccl_msg;
424 stream_for_pop(*it->second);
427 messages_.erase(it->second);
429 waiting_for_ack_.erase(it);
439 void goby::acomms::Queue::stream_for_pop(
const QueuedMessage& queued_msg)
441 glog.is(DEBUG1) &&
glog << group(parent_->glog_pop_group()) << parent_->msg_string(desc_)
442 <<
": popping from send stack" 443 <<
" (queue size " << size() - 1 <<
"/" 444 << queue_message_options().max_queue() <<
")" << std::endl;
446 glog.is(DEBUG2) &&
glog << group(parent_->glog_push_group())
447 <<
"Message: " << *queued_msg.dccl_msg << std::endl;
448 glog.is(DEBUG2) &&
glog << group(parent_->glog_push_group()) <<
"Meta: " << queued_msg.meta
452 std::vector<boost::shared_ptr<google::protobuf::Message> > goby::acomms::Queue::expire()
454 std::vector<boost::shared_ptr<google::protobuf::Message> > expired_msgs;
456 while (!messages_.empty())
458 if ((goby::util::as<boost::posix_time::ptime>(messages_.front().meta.time()) +
459 boost::posix_time::seconds(queue_message_options().ttl())) <
goby_time())
461 expired_msgs.push_back(messages_.front().dccl_msg);
462 glog.is(DEBUG1) &&
glog << group(parent_->glog_pop_group()) <<
"expiring" 463 <<
" from send stack " << name() <<
" " 464 << messages_.front().meta.time() <<
" (qsize " << size() - 1
465 <<
"/" << queue_message_options().max_queue()
466 <<
"): " << *messages_.front().dccl_msg << std::endl;
468 waiting_for_ack_it it = find_ack_value(messages_.begin());
469 if (it != waiting_for_ack_.end())
470 waiting_for_ack_.erase(it);
472 messages_.pop_front();
483 goby::acomms::waiting_for_ack_it goby::acomms::Queue::find_ack_value(messages_it it_to_find)
485 waiting_for_ack_it n = waiting_for_ack_.end();
486 for (waiting_for_ack_it it = waiting_for_ack_.begin(); it != n; ++it)
488 if (it->second == it_to_find)
494 void goby::acomms::Queue::info(std::ostream* os)
const 496 *os <<
"== Begin Queue [[" << name() <<
"]] ==\n";
497 *os <<
"Contains " << messages_.size() <<
" message(s)." 499 <<
"Configured options: \n" 500 << cfg_.ShortDebugString();
501 *os <<
"\n== End Queue [[" << name() <<
"]] ==\n";
504 void goby::acomms::Queue::flush()
506 glog.is(DEBUG1) &&
glog << group(parent_->glog_pop_group()) <<
"flushing stack " << name()
507 <<
" (qsize 0)" << std::endl;
509 waiting_for_ack_.clear();
512 bool goby::acomms::Queue::clear_ack_queue(
unsigned start_frame)
514 for (waiting_for_ack_it it = waiting_for_ack_.begin(), end = waiting_for_ack_.end(); it != end;)
518 if (it->first >= start_frame)
521 glog << group(parent_->glog_pop_group()) << name()
522 <<
": Clearing ack for queue because last_frame >= current_frame" << std::endl;
523 waiting_for_ack_.erase(it++);
525 else if (it->second->meta.last_sent_time() +
526 parent_->cfg_.minimum_ack_wait_seconds() * 1e6 <
529 glog.is(DEBUG1) &&
glog << group(parent_->glog_pop_group()) << name()
530 <<
": Clearing ack for queue because " 531 << parent_->cfg_.minimum_ack_wait_seconds()
532 <<
" seconds has elapsed since last send. Last send:" 533 << it->second->meta.last_sent_time() << std::endl;
534 waiting_for_ack_.erase(it++);
541 return waiting_for_ack_.empty();
550 void goby::acomms::Queue::process_cfg()
553 static_meta_.Clear();
556 boost::shared_ptr<google::protobuf::Message> new_msg =
557 goby::util::DynamicProtobufManager::new_protobuf_message(desc_);
559 for (
int i = 0, n = cfg_.role_size(); i < n; ++i)
561 std::string role_field;
563 switch (cfg_.role(i).setting())
565 case protobuf::QueuedMessageEntry::Role::STATIC:
567 if (!cfg_.role(i).has_static_value())
568 throw(QueueException(
569 "Role " + protobuf::QueuedMessageEntry::RoleType_Name(cfg_.role(i).type()) +
570 " is set to STATIC but has no `static_value`"));
572 switch (cfg_.role(i).type())
574 case protobuf::QueuedMessageEntry::DESTINATION_ID:
575 static_meta_.set_dest(cfg_.role(i).static_value());
578 case protobuf::QueuedMessageEntry::SOURCE_ID:
579 static_meta_.set_src(cfg_.role(i).static_value());
582 case protobuf::QueuedMessageEntry::TIMESTAMP:
583 throw(QueueException(
"TIMESTAMP role cannot be static"));
589 case protobuf::QueuedMessageEntry::Role::FIELD_VALUE:
591 role_field = cfg_.role(i).field();
594 find_queue_field(role_field, *new_msg);
598 typedef std::map<protobuf::QueuedMessageEntry::RoleType, std::string> Map;
600 std::pair<Map::iterator, bool> result =
601 roles_.insert(std::make_pair(cfg_.role(i).type(), role_field));
603 throw(QueueException(
"Role " +
604 protobuf::QueuedMessageEntry::RoleType_Name(cfg_.role(i).type()) +
605 " was assigned more than once. Each role must have at most one " 606 "field or static value per message."));
uint64 goby_time< uint64 >()
Returns current UTC time as integer microseconds since 1970-01-01 00:00:00.
google::protobuf::uint32 uint32
an unsigned 32 bit integer
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
const int BROADCAST_ID
special modem id for the broadcast destination - no one is assigned this address. Analogous to 192...
double time_duration2double(boost::posix_time::time_duration time_of_day)
time duration to double number of seconds: good to the microsecond
google::protobuf::int64 int64
a signed 64 bit integer
std::ostream & operator<<(std::ostream &out, const QueueManager &d)
outputs information about all available messages (same as info_all)
common::FlexOstream glog
Access the Goby logger through this object.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
google::protobuf::int32 int32
a signed 32 bit integer
const int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...