23 #ifndef QueueManager20091204_H 24 #define QueueManager20091204_H 26 #include <boost/bind.hpp> 27 #include <boost/signals2.hpp> 31 #include "goby/acomms/dccl.h" 32 #include "goby/acomms/protobuf/network_ack.pb.h" 33 #include "goby/acomms/protobuf/queue.pb.h" 39 #include "queue_exception.h" 71 template <
typename ProtobufMessage>
74 add_queue(ProtobufMessage::descriptor(), queue_cfg);
78 void add_queue(
const google::protobuf::Descriptor* desc,
136 void info_all(std::ostream* os)
const;
142 template <
typename ProtobufMessage>
void info(std::ostream* os)
const 144 info(ProtobufMessage::descriptor(), os);
148 void info(
const google::protobuf::Descriptor* desc, std::ostream* os)
const;
150 const std::string& glog_push_group() {
return glog_push_group_; }
151 const std::string& glog_pop_group() {
return glog_pop_group_; }
152 const std::string& glog_priority_group() {
return glog_priority_group_; }
153 const std::string& glog_out_group() {
return glog_out_group_; }
154 const std::string& glog_in_group() {
return glog_in_group_; }
156 std::string msg_string(
const google::protobuf::Descriptor* desc)
158 return desc->full_name() +
" (" + goby::util::as<std::string>(codec_->id(desc)) +
")";
166 unsigned dccl_id = codec_->id(msg.GetDescriptor());
167 if (!queues_.count(dccl_id))
168 throw(
QueueException(
"No such queue [[" + msg.GetDescriptor()->full_name() +
171 return queues_[dccl_id]->meta_from_msg(msg);
188 boost::signals2::signal<void(const google::protobuf::Message& msg)>
signal_receive;
193 boost::signals2::signal<void(const google::protobuf::Message& orig_msg)>
signal_expire;
230 void qsize(
Queue* q);
234 bool first_user_frame);
242 goby::acomms::protobuf::NetworkAck::AckType ack_type);
245 std::string encode_repeated(
const std::list<QueuedMessage>& msgs);
246 std::list<QueuedMessage> decode_repeated(
const std::string& orig_bytes);
247 unsigned size_repeated(
const std::list<QueuedMessage>& msgs);
252 std::map<unsigned, boost::shared_ptr<Queue> > queues_;
256 std::multimap<unsigned, Queue*> waiting_for_ack_;
259 unsigned packet_ack_;
263 std::set<ModemId> network_ack_src_ids_;
264 std::set<ModemId> route_additional_modem_ids_;
267 std::map<ModemId, std::string> encrypt_rules_;
273 std::string glog_push_group_;
274 std::string glog_pop_group_;
275 std::string glog_priority_group_;
276 std::string glog_out_group_;
277 std::string glog_in_group_;
281 class ManipulatorManager
284 void add(
unsigned id, goby::acomms::protobuf::Manipulator manip)
286 manips_.insert(std::make_pair(
id, manip));
289 bool has(
unsigned id, goby::acomms::protobuf::Manipulator manip)
const 291 typedef std::multimap<unsigned, goby::acomms::protobuf::Manipulator>::const_iterator
293 std::pair<iterator, iterator> p = manips_.equal_range(
id);
295 for (iterator it = p.first; it != p.second; ++it)
297 if (it->second == manip)
304 void clear() { manips_.clear(); }
309 std::multimap<unsigned, goby::acomms::protobuf::Manipulator> manips_;
312 ManipulatorManager manip_manager_;
316 std::ostream& operator<<(std::ostream& out,
const QueueManager& d);
provides an API to the goby-acomms Queuing Library.
boost::signals2::signal< void(const protobuf::ModemTransmission &ack_msg, const google::protobuf::Message &orig_msg)> signal_ack
Signals when acknowledgment of proper message receipt has been received. This is only sent for queues...
boost::signals2::signal< void(const protobuf::ModemTransmission &request_msg, google::protobuf::Message *data_msg)> signal_data_on_demand
Forwards the data request to the application layer. This advanced feature is used when queue...
boost::signals2::signal< void(const google::protobuf::Message &msg)> signal_receive
Signals when a DCCL message is received.
Exception class for libdccl.
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
boost::signals2::signal< void(const protobuf::QueuedMessageMeta &meta, const google::protobuf::Message &data_msg, int modem_id)> signal_in_route
Used by a router to intercept messages and requeue them if desired.
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
boost::signals2::signal< void(protobuf::QueuedMessageMeta *meta, const google::protobuf::Message &data_msg, int modem_id)> signal_out_route
Used by a router to change next-hop destination (in meta)
~QueueManager()
destructor
The global namespace for the Goby project.
boost::signals2::signal< void(const google::protobuf::Message &orig_msg)> signal_expire
Signals when a message is expires (exceeds its time-to-live or ttl) before being sent (if queue...
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
boost::signals2::signal< void(protobuf::QueueSize size)> signal_queue_size_change
Signals when any queue changes size (message is popped or pushed)
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
int modem_id()
The current modem ID (MAC address) of this node.
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
QueueManager()
constructor