Goby3  3.1.4
2024.02.22
queue_manager.h
Go to the documentation of this file.
1 // Copyright 2009-2021:
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 // File authors:
6 // Toby Schneider <toby@gobysoft.org>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_ACOMMS_QUEUE_QUEUE_MANAGER_H
26 #define GOBY_ACOMMS_QUEUE_QUEUE_MANAGER_H
27 
28 #include <iosfwd> // for ostream
29 #include <list> // for list
30 #include <map> // for allocator, multimap
31 #include <memory> // for shared_ptr, __sha...
32 #include <set> // for set
33 #include <string> // for string, operator+
34 #include <utility> // for pair, make_pair
35 
36 #include <boost/signals2/signal.hpp> // for signal
37 #include <google/protobuf/descriptor.h> // for Descriptor
38 #include <google/protobuf/message.h> // for Message
39 
40 #include "goby/acomms/dccl/dccl.h" // for DCCLCodec
41 #include "goby/acomms/protobuf/manipulator.pb.h" // for Manipulator
42 #include "goby/acomms/protobuf/network_ack.pb.h" // for NetworkAck, Netwo...
43 #include "goby/acomms/protobuf/queue.pb.h" // for QueuedMessageEntr...
44 #include "goby/util/as.h" // for as
45 
46 #include "queue.h" // for Queue
47 #include "queue_exception.h" // for QueueException
48 
49 namespace goby
50 {
51 namespace acomms
52 {
53 namespace protobuf
54 {
55 class ModemTransmission;
56 } // namespace protobuf
61 
67 {
68  public:
70  QueueManager();
72  ~QueueManager() = default;
73 
77 
78 
80  void set_cfg(const protobuf::QueueManagerConfig& cfg);
81 
83  void merge_cfg(const protobuf::QueueManagerConfig& cfg);
84 
88  template <typename ProtobufMessage>
89  void add_queue(const protobuf::QueuedMessageEntry& queue_cfg)
90  {
91  add_queue(ProtobufMessage::descriptor(), queue_cfg);
92  }
93 
95  void add_queue(const google::protobuf::Descriptor* desc,
96  const protobuf::QueuedMessageEntry& queue_cfg); //@}
97 
102 
103 
107  void push_message(const google::protobuf::Message& new_message);
108  void push_message(const google::protobuf::Message& new_message,
109  const protobuf::QueuedMessageMeta* meta);
110 
114  void flush_queue(const protobuf::QueueFlush& flush);
116 
120 
121 
127 
133 
135 
139 
140  void do_work();
142 
144 
147 
148 
152  void info_all(std::ostream* os) const;
153 
158  template <typename ProtobufMessage> void info(std::ostream* os) const
159  {
160  info(ProtobufMessage::descriptor(), os);
161  }
162 
164  void info(const google::protobuf::Descriptor* desc, std::ostream* os) const;
165 
166  const std::string& glog_push_group() { return glog_push_group_; }
167  const std::string& glog_pop_group() { return glog_pop_group_; }
168  const std::string& glog_priority_group() { return glog_priority_group_; }
169  const std::string& glog_out_group() { return glog_out_group_; }
170  const std::string& glog_in_group() { return glog_in_group_; }
171 
172  std::string msg_string(const google::protobuf::Descriptor* desc)
173  {
174  return desc->full_name() + " (" + goby::util::as<std::string>(codec_->id(desc)) + ")";
175  }
176 
178  int modem_id() { return modem_id_; }
179 
181  {
182  unsigned dccl_id = codec_->id(msg.GetDescriptor());
183  if (!queues_.count(dccl_id))
184  throw(QueueException("No such queue [[" + msg.GetDescriptor()->full_name() +
185  "]] loaded"));
186 
187  return queues_[dccl_id]->meta_from_msg(msg);
188  }
189 
191 
193 
194  boost::signals2::signal<void(const protobuf::ModemTransmission& ack_msg,
199  const google::protobuf::Message& orig_msg)>
200  signal_ack;
201 
205  boost::signals2::signal<void(const google::protobuf::Message& msg)> signal_receive;
206 
210  boost::signals2::signal<void(const google::protobuf::Message& orig_msg)> signal_expire;
211 
216  boost::signals2::signal<void(const protobuf::ModemTransmission& request_msg,
217  google::protobuf::Message* data_msg)>
218  signal_data_on_demand;
219 
223  boost::signals2::signal<void(protobuf::QueueSize size)> signal_queue_size_change;
225 
227  boost::signals2::signal<void(protobuf::QueuedMessageMeta* meta,
228  const google::protobuf::Message& data_msg, int modem_id)>
229  signal_out_route;
230 
232  boost::signals2::signal<void(const protobuf::QueuedMessageMeta& meta,
233  const google::protobuf::Message& data_msg, int modem_id)>
234  signal_in_route;
235 
236  private:
237  QueueManager(const QueueManager&) = delete;
238  QueueManager& operator=(const QueueManager&) = delete;
240 
241  void qsize(Queue* q);
242 
243  // finds the %queue with the highest priority
244  Queue* find_next_sender(const protobuf::ModemTransmission& message, const std::string& data,
245  bool first_user_frame);
246 
247  // clears the destination and ack values for the packet to reset for next $CADRQ
248  void clear_packet(const protobuf::ModemTransmission& message);
249  void process_cfg();
250 
251  void process_modem_ack(const protobuf::ModemTransmission& ack_msg);
252  void create_network_ack(int ack_src, const google::protobuf::Message& orig_msg,
254 
255  // "overload" those from DCCLCodec to allow changing of crypto passphrase
256  std::string encode_repeated(const std::list<QueuedMessage>& msgs);
257  std::list<QueuedMessage> decode_repeated(const std::string& orig_bytes);
258  unsigned size_repeated(const std::list<QueuedMessage>& msgs);
259 
260  private:
261  friend class Queue;
262  int modem_id_;
263  std::map<unsigned, std::shared_ptr<Queue> > queues_;
264 
265  // map frame number onto %queue pointer that contains
266  // the data for this ack
267  std::multimap<unsigned, Queue*> waiting_for_ack_;
268 
269  // the first *user* frame sets the tone (dest & ack) for the entire packet (all %modem frames)
270  unsigned packet_ack_{0};
271  int packet_dest_;
272 
273  typedef int ModemId;
274  std::set<ModemId> network_ack_src_ids_;
275  std::set<ModemId> route_additional_modem_ids_;
276 
277  // maps id to crypto passphrase
278  std::map<ModemId, std::string> encrypt_rules_;
279 
280  protobuf::QueueManagerConfig cfg_;
281 
282  goby::acomms::DCCLCodec* codec_;
283 
284  std::string glog_push_group_;
285  std::string glog_pop_group_;
286  std::string glog_priority_group_;
287  std::string glog_out_group_;
288  std::string glog_in_group_;
289 
290  static int count_;
291 
292  class ManipulatorManager
293  {
294  public:
295  void add(unsigned id, goby::acomms::protobuf::Manipulator manip)
296  {
297  manips_.insert(std::make_pair(id, manip));
298  }
299 
300  bool has(unsigned id, goby::acomms::protobuf::Manipulator manip) const
301  {
302  using iterator =
303  std::multimap<unsigned int, goby::acomms::protobuf::Manipulator>::const_iterator;
304  std::pair<iterator, iterator> p = manips_.equal_range(id);
305 
306  for (auto it = p.first; it != p.second; ++it)
307  {
308  if (it->second == manip)
309  return true;
310  }
311 
312  return false;
313  }
314 
315  void clear() { manips_.clear(); }
316 
317  private:
318  // manipulator multimap (no_encode, no_decode, etc)
319  // maps DCCL ID (unsigned) onto Manipulator enumeration (xml_config.proto)
320  std::multimap<unsigned, goby::acomms::protobuf::Manipulator> manips_;
321  };
322 
323  ManipulatorManager manip_manager_;
324 };
325 
327 std::ostream& operator<<(std::ostream& out, const QueueManager& d);
328 
329 } // namespace acomms
330 
331 } // namespace goby
332 
333 #endif
goby::acomms::protobuf::ModemTransmission
Definition: modem_message.pb.h:166
queue.pb.h
goby::acomms::QueueManager::add_queue
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
Definition: queue_manager.h:89
goby::acomms::QueueManager::glog_push_group
const std::string & glog_push_group()
Definition: queue_manager.h:166
goby::acomms::protobuf::QueuedMessageMeta
Definition: queue.pb.h:1126
goby::acomms::Queue
Definition: queue.h:73
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::acomms::QueueManager::glog_out_group
const std::string & glog_out_group()
Definition: queue_manager.h:169
as.h
goby::acomms::protobuf::Manipulator
Manipulator
Definition: manipulator.pb.h:61
goby::acomms::protobuf::QueueFlush
Definition: queue.pb.h:1012
goby::acomms::QueueManager::meta_from_msg
protobuf::QueuedMessageMeta meta_from_msg(const google::protobuf::Message &msg)
Definition: queue_manager.h:180
goby::acomms::QueueManager::handle_modem_receive
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
detail::void
j template void())
Definition: json.hpp:4822
goby::acomms::QueueManager::push_message
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
goby::acomms::protobuf::NetworkAck_AckType
NetworkAck_AckType
Definition: network_ack.pb.h:73
goby::acomms::QueueManager::msg_string
std::string msg_string(const google::protobuf::Descriptor *desc)
Definition: queue_manager.h:172
goby::acomms::operator<<
std::ostream & operator<<(std::ostream &os, const MACManager &mac)
goby::acomms::QueueManager::handle_modem_data_request
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
goby::acomms::QueueManager::modem_id
int modem_id()
The current modem ID (MAC address) of this node.
Definition: queue_manager.h:178
goby::acomms::DCCLCodec
Definition: dccl.h:128
goby::acomms::QueueManager::~QueueManager
~QueueManager()=default
destructor
dccl.h
message.h
queue_exception.h
goby::acomms::QueueManager::glog_priority_group
const std::string & glog_priority_group()
Definition: queue_manager.h:168
goby::acomms::protobuf::QueueSize
Definition: queue.pb.h:885
goby::acomms::QueueException
Exception class for libdccl.
Definition: queue_exception.h:36
goby::acomms::QueueManager::set_cfg
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::acomms::QueueManager::do_work
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
goby::acomms::QueueManager::flush_queue
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
google::protobuf::Message
Definition: message.h:189
goby::acomms::QueueManager
provides an API to the goby-acomms Queuing Library.
Definition: queue_manager.h:66
manipulator.pb.h
goby::acomms::QueueManager::merge_cfg
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
goby::acomms::QueueManager::info
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
Definition: queue_manager.h:158
goby::acomms::protobuf::QueuedMessageEntry
Definition: queue.pb.h:319
goby::acomms::QueueManager::glog_in_group
const std::string & glog_in_group()
Definition: queue_manager.h:170
goby::acomms::QueueManager::glog_pop_group
const std::string & glog_pop_group()
Definition: queue_manager.h:167
queue.h
goby::acomms::QueueManager::info_all
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
goby::acomms::protobuf::QueueManagerConfig
Definition: queue.pb.h:687
network_ack.pb.h
goby::acomms::QueueManager::QueueManager
QueueManager()
constructor