Goby v2
queue_manager.h
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #ifndef QueueManager20091204_H
24 #define QueueManager20091204_H
25 
26 #include <boost/bind.hpp>
27 #include <boost/signals2.hpp>
28 #include <limits>
29 #include <set>
30 
31 #include "goby/acomms/dccl.h"
32 #include "goby/acomms/protobuf/network_ack.pb.h"
33 #include "goby/acomms/protobuf/queue.pb.h"
34 
35 #include <deque>
36 #include <map>
37 
38 #include "queue.h"
39 #include "queue_exception.h"
40 
41 namespace goby
42 {
43 namespace acomms
44 {
50 {
51  public:
53  QueueManager();
56 
60 
61 
63  void set_cfg(const protobuf::QueueManagerConfig& cfg);
64 
66  void merge_cfg(const protobuf::QueueManagerConfig& cfg);
67 
71  template <typename ProtobufMessage>
72  void add_queue(const protobuf::QueuedMessageEntry& queue_cfg)
73  {
74  add_queue(ProtobufMessage::descriptor(), queue_cfg);
75  }
76 
78  void add_queue(const google::protobuf::Descriptor* desc,
79  const protobuf::QueuedMessageEntry& queue_cfg); //@}
80 
85 
86 
90  void push_message(const google::protobuf::Message& new_message);
91  void push_message(const google::protobuf::Message& new_message,
92  const protobuf::QueuedMessageMeta* meta);
93 
97  void flush_queue(const protobuf::QueueFlush& flush);
99 
103 
104 
111 
117 
119 
123 
124  void do_work();
126 
128 
131 
132 
136  void info_all(std::ostream* os) const;
137 
142  template <typename ProtobufMessage> void info(std::ostream* os) const
143  {
144  info(ProtobufMessage::descriptor(), os);
145  }
146 
148  void info(const google::protobuf::Descriptor* desc, std::ostream* os) const;
149 
150  const std::string& glog_push_group() { return glog_push_group_; }
151  const std::string& glog_pop_group() { return glog_pop_group_; }
152  const std::string& glog_priority_group() { return glog_priority_group_; }
153  const std::string& glog_out_group() { return glog_out_group_; }
154  const std::string& glog_in_group() { return glog_in_group_; }
155 
156  std::string msg_string(const google::protobuf::Descriptor* desc)
157  {
158  return desc->full_name() + " (" + goby::util::as<std::string>(codec_->id(desc)) + ")";
159  }
160 
162  int modem_id() { return modem_id_; }
163 
165  {
166  unsigned dccl_id = codec_->id(msg.GetDescriptor());
167  if (!queues_.count(dccl_id))
168  throw(QueueException("No such queue [[" + msg.GetDescriptor()->full_name() +
169  "]] loaded"));
170 
171  return queues_[dccl_id]->meta_from_msg(msg);
172  }
173 
175 
177 
178  boost::signals2::signal<void(const protobuf::ModemTransmission& ack_msg,
182  const google::protobuf::Message& orig_msg)>
184 
188  boost::signals2::signal<void(const google::protobuf::Message& msg)> signal_receive;
189 
193  boost::signals2::signal<void(const google::protobuf::Message& orig_msg)> signal_expire;
194 
199  boost::signals2::signal<void(const protobuf::ModemTransmission& request_msg,
200  google::protobuf::Message* data_msg)>
202 
206  boost::signals2::signal<void(protobuf::QueueSize size)> signal_queue_size_change;
208 
210  boost::signals2::signal<void(protobuf::QueuedMessageMeta* meta,
211  const google::protobuf::Message& data_msg, int modem_id)>
213 
215  boost::signals2::signal<void(const protobuf::QueuedMessageMeta& meta,
216  const google::protobuf::Message& data_msg, int modem_id)>
218 
224 
225  private:
226  QueueManager(const QueueManager&);
227  QueueManager& operator=(const QueueManager&);
229 
230  void qsize(Queue* q);
231 
232  // finds the %queue with the highest priority
233  Queue* find_next_sender(const protobuf::ModemTransmission& message, const std::string& data,
234  bool first_user_frame);
235 
236  // clears the destination and ack values for the packet to reset for next $CADRQ
237  void clear_packet(const protobuf::ModemTransmission& message);
238  void process_cfg();
239 
240  void process_modem_ack(const protobuf::ModemTransmission& ack_msg);
241  void create_network_ack(int ack_src, const google::protobuf::Message& orig_msg,
242  goby::acomms::protobuf::NetworkAck::AckType ack_type);
243 
244  // "overload" those from DCCLCodec to allow changing of crypto passphrase
245  std::string encode_repeated(const std::list<QueuedMessage>& msgs);
246  std::list<QueuedMessage> decode_repeated(const std::string& orig_bytes);
247  unsigned size_repeated(const std::list<QueuedMessage>& msgs);
248 
249  private:
250  friend class Queue;
251  int modem_id_;
252  std::map<unsigned, boost::shared_ptr<Queue> > queues_;
253 
254  // map frame number onto %queue pointer that contains
255  // the data for this ack
256  std::multimap<unsigned, Queue*> waiting_for_ack_;
257 
258  // the first *user* frame sets the tone (dest & ack) for the entire packet (all %modem frames)
259  unsigned packet_ack_;
260  int packet_dest_;
261 
262  typedef int ModemId;
263  std::set<ModemId> network_ack_src_ids_;
264  std::set<ModemId> route_additional_modem_ids_;
265 
266  // maps id to crypto passphrase
267  std::map<ModemId, std::string> encrypt_rules_;
268 
270 
271  goby::acomms::DCCLCodec* codec_;
272 
273  std::string glog_push_group_;
274  std::string glog_pop_group_;
275  std::string glog_priority_group_;
276  std::string glog_out_group_;
277  std::string glog_in_group_;
278 
279  static int count_;
280 
281  class ManipulatorManager
282  {
283  public:
284  void add(unsigned id, goby::acomms::protobuf::Manipulator manip)
285  {
286  manips_.insert(std::make_pair(id, manip));
287  }
288 
289  bool has(unsigned id, goby::acomms::protobuf::Manipulator manip) const
290  {
291  typedef std::multimap<unsigned, goby::acomms::protobuf::Manipulator>::const_iterator
292  iterator;
293  std::pair<iterator, iterator> p = manips_.equal_range(id);
294 
295  for (iterator it = p.first; it != p.second; ++it)
296  {
297  if (it->second == manip)
298  return true;
299  }
300 
301  return false;
302  }
303 
304  void clear() { manips_.clear(); }
305 
306  private:
307  // manipulator multimap (no_encode, no_decode, etc)
308  // maps DCCL ID (unsigned) onto Manipulator enumeration (xml_config.proto)
309  std::multimap<unsigned, goby::acomms::protobuf::Manipulator> manips_;
310  };
311 
312  ManipulatorManager manip_manager_;
313 };
314 
316 std::ostream& operator<<(std::ostream& out, const QueueManager& d);
317 
318 } // namespace acomms
319 
320 } // namespace goby
321 
322 #endif
provides an API to the goby-acomms Queuing Library.
Definition: queue_manager.h:49
boost::signals2::signal< void(const protobuf::ModemTransmission &ack_msg, const google::protobuf::Message &orig_msg)> signal_ack
Signals when acknowledgment of proper message receipt has been received. This is only sent for queues...
boost::signals2::signal< void(const protobuf::ModemTransmission &request_msg, google::protobuf::Message *data_msg)> signal_data_on_demand
Forwards the data request to the application layer. This advanced feature is used when queue...
boost::signals2::signal< void(const google::protobuf::Message &msg)> signal_receive
Signals when a DCCL message is received.
Exception class for libdccl.
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
Definition: queue_manager.h:72
boost::signals2::signal< void(const protobuf::QueuedMessageMeta &meta, const google::protobuf::Message &data_msg, int modem_id)> signal_in_route
Used by a router to intercept messages and requeue them if desired.
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
boost::signals2::signal< void(protobuf::QueuedMessageMeta *meta, const google::protobuf::Message &data_msg, int modem_id)> signal_out_route
Used by a router to change next-hop destination (in meta)
The global namespace for the Goby project.
boost::signals2::signal< void(const google::protobuf::Message &orig_msg)> signal_expire
Signals when a message is expires (exceeds its time-to-live or ttl) before being sent (if queue...
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
boost::signals2::signal< void(protobuf::QueueSize size)> signal_queue_size_change
Signals when any queue changes size (message is popped or pushed)
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
int modem_id()
The current modem ID (MAC address) of this node.
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)