Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
queue_manager.h
Go to the documentation of this file.
1// Copyright 2009-2026:
2// GobySoft, LLC (2013-)
3// Massachusetts Institute of Technology (2007-2014)
4// Community contributors (see AUTHORS file)
5// File authors:
6// Toby Schneider <toby@gobysoft.org>
7// Copilot <198982749+Copilot@users.noreply.github.com>
8//
9//
10// This file is part of the Goby Underwater Autonomy Project Libraries
11// ("The Goby Libraries").
12//
13// The Goby Libraries are free software: you can redistribute them and/or modify
14// them under the terms of the GNU Lesser General Public License as published by
15// the Free Software Foundation, either version 2.1 of the License, or
16// (at your option) any later version.
17//
18// The Goby Libraries are distributed in the hope that they will be useful,
19// but WITHOUT ANY WARRANTY; without even the implied warranty of
20// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21// GNU Lesser General Public License for more details.
22//
23// You should have received a copy of the GNU Lesser General Public License
24// along with Goby. If not, see <http://www.gnu.org/licenses/>.
25
26#ifndef GOBY_ACOMMS_QUEUE_QUEUE_MANAGER_H
27#define GOBY_ACOMMS_QUEUE_QUEUE_MANAGER_H
28
29#include <iosfwd> // for ostream
30#include <list> // for list
31#include <map> // for allocator, multimap
32#include <memory> // for shared_ptr, __sha...
33#include <set> // for set
34#include <string> // for string, operator+
35#include <utility> // for pair, make_pair
36
37#include <boost/signals2/signal.hpp> // for signal
38#include <google/protobuf/descriptor.h> // for Descriptor
39#include <google/protobuf/message.h> // for Message
40
41#include "goby/acomms/dccl/dccl.h" // for DCCLCodec
42#include "goby/acomms/protobuf/manipulator.pb.h" // for Manipulator
43#include "goby/acomms/protobuf/network_ack.pb.h" // for NetworkAck, Netwo...
44#include "goby/acomms/protobuf/queue.pb.h" // for QueuedMessageEntr...
45#include "goby/util/as.h" // for as
46
47#include "queue.h" // for Queue
48#include "queue_exception.h" // for QueueException
49
50namespace goby
51{
52namespace acomms
53{
54namespace protobuf
55{
56class ModemTransmission;
57} // namespace protobuf
62
68{
69 public:
73 ~QueueManager() = default;
74
78
79
82
85
89 template <typename ProtobufMessage>
91 {
92 add_queue(ProtobufMessage::descriptor(), queue_cfg);
93 }
94
96 void add_queue(const google::protobuf::Descriptor* desc,
97 const protobuf::QueuedMessageEntry& queue_cfg); //@}
98
103
104
108 void push_message(const google::protobuf::Message& new_message);
110 const protobuf::QueuedMessageMeta* meta);
111
117
121
122
128
134
136
140
141
142 void do_work();
143
145
148
149
153 void info_all(std::ostream* os) const;
154
159 template <typename ProtobufMessage> void info(std::ostream* os) const
160 {
161 info(ProtobufMessage::descriptor(), os);
162 }
163
165 void info(const google::protobuf::Descriptor* desc, std::ostream* os) const;
166
167 const std::string& glog_push_group() { return glog_push_group_; }
168 const std::string& glog_pop_group() { return glog_pop_group_; }
169 const std::string& glog_priority_group() { return glog_priority_group_; }
170 const std::string& glog_out_group() { return glog_out_group_; }
171 const std::string& glog_in_group() { return glog_in_group_; }
172
173 std::string msg_string(const google::protobuf::Descriptor* desc)
174 {
175 return desc->full_name() + " (" + goby::util::as<std::string>(codec_->id(desc)) + ")";
176 }
177
179 int modem_id() { return modem_id_; }
180
182 {
183 unsigned dccl_id = codec_->id(msg.GetDescriptor());
184 if (!queues_.count(dccl_id))
185 throw(QueueException("No such queue [[" + msg.GetDescriptor()->full_name() +
186 "]] loaded"));
187
188 return queues_[dccl_id]->meta_from_msg(msg);
189 }
190
192
194
195
199 boost::signals2::signal<void(const protobuf::ModemTransmission& ack_msg,
200 const google::protobuf::Message& orig_msg)>
202
206 boost::signals2::signal<void(const google::protobuf::Message& msg)> signal_receive;
207
211 boost::signals2::signal<void(const google::protobuf::Message& orig_msg)> signal_expire;
212
217 boost::signals2::signal<void(const protobuf::ModemTransmission& request_msg,
218 google::protobuf::Message* data_msg)>
220
224 boost::signals2::signal<void(protobuf::QueueSize size)> signal_queue_size_change;
226
228 boost::signals2::signal<void(protobuf::QueuedMessageMeta* meta,
229 const google::protobuf::Message& data_msg, int modem_id)>
231
233 boost::signals2::signal<void(const protobuf::QueuedMessageMeta& meta,
234 const google::protobuf::Message& data_msg, int modem_id)>
236
237 private:
238 QueueManager(const QueueManager&) = delete;
239 QueueManager& operator=(const QueueManager&) = delete;
241
242 void qsize(Queue* q);
243
244 // finds the %queue with the highest priority
245 Queue* find_next_sender(const protobuf::ModemTransmission& message, const std::string& data,
246 bool first_user_frame);
247
248 // clears the destination and ack values for the packet to reset for next $CADRQ
249 void clear_packet(const protobuf::ModemTransmission& message);
250 void process_cfg();
251
252 void process_modem_ack(const protobuf::ModemTransmission& ack_msg);
253 void create_network_ack(int ack_src, const google::protobuf::Message& orig_msg,
255
256 // "overload" those from DCCLCodec to allow changing of crypto passphrase
257 std::string encode_repeated(const std::list<QueuedMessage>& msgs);
258 std::list<QueuedMessage> decode_repeated(const std::string& orig_bytes);
259 unsigned size_repeated(const std::list<QueuedMessage>& msgs);
260
261 private:
262 friend class Queue;
263 int modem_id_;
264 std::map<unsigned, std::shared_ptr<Queue> > queues_;
265
266 // map frame number onto %queue pointer that contains
267 // the data for this ack
268 std::multimap<unsigned, Queue*> waiting_for_ack_;
269
270 // the first *user* frame sets the tone (dest & ack) for the entire packet (all %modem frames)
271 unsigned packet_ack_{0};
272 int packet_dest_;
273
274 typedef int ModemId;
275 std::set<ModemId> network_ack_src_ids_;
276 std::set<ModemId> route_additional_modem_ids_;
277
278 // maps id to crypto passphrase
279 std::map<ModemId, std::string> encrypt_rules_;
280
282
284
285 std::string glog_push_group_;
286 std::string glog_pop_group_;
287 std::string glog_priority_group_;
288 std::string glog_out_group_;
289 std::string glog_in_group_;
290
291 static int count_;
292
293 class ManipulatorManager
294 {
295 public:
296 void add(unsigned id, goby::acomms::protobuf::Manipulator manip)
297 {
298 manips_.insert(std::make_pair(id, manip));
299 }
300
301 bool has(unsigned id, goby::acomms::protobuf::Manipulator manip) const
302 {
303 using iterator =
304 std::multimap<unsigned int, goby::acomms::protobuf::Manipulator>::const_iterator;
305 std::pair<iterator, iterator> p = manips_.equal_range(id);
306
307 for (auto it = p.first; it != p.second; ++it)
308 {
309 if (it->second == manip)
310 return true;
311 }
312
313 return false;
314 }
315
316 void clear() { manips_.clear(); }
317
318 private:
319 // manipulator multimap (no_encode, no_decode, etc)
320 // maps DCCL ID (unsigned) onto Manipulator enumeration (xml_config.proto)
321 std::multimap<unsigned, goby::acomms::protobuf::Manipulator> manips_;
322 };
323
324 ManipulatorManager manip_manager_;
325};
326
328std::ostream& operator<<(std::ostream& out, const QueueManager& d);
329
330} // namespace acomms
331
332} // namespace goby
333
334#endif
unsigned id() const
Definition dccl.h:169
Exception class for libdccl.
provides an API to the goby-acomms Queuing Library.
boost::signals2::signal< void(protobuf::QueuedMessageMeta *meta, const google::protobuf::Message &data_msg, int modem_id)> signal_out_route
Used by a router to change next-hop destination (in meta)
const std::string & glog_push_group()
boost::signals2::signal< void(const google::protobuf::Message &orig_msg)> signal_expire
Signals when a message is expires (exceeds its time-to-live or ttl) before being sent (if queue....
boost::signals2::signal< void(const protobuf::ModemTransmission &request_msg, google::protobuf::Message *data_msg)> signal_data_on_demand
Forwards the data request to the application layer. This advanced feature is used when queue....
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
~QueueManager()=default
destructor
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
const std::string & glog_pop_group()
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
boost::signals2::signal< void(const protobuf::QueuedMessageMeta &meta, const google::protobuf::Message &data_msg, int modem_id)> signal_in_route
Used by a router to intercept messages and requeue them if desired.
std::string msg_string(const google::protobuf::Descriptor *desc)
protobuf::QueuedMessageMeta meta_from_msg(const google::protobuf::Message &msg)
boost::signals2::signal< void(protobuf::QueueSize size)> signal_queue_size_change
Signals when any queue changes size (message is popped or pushed)
const std::string & glog_in_group()
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
void push_message(const google::protobuf::Message &new_message, const protobuf::QueuedMessageMeta *meta)
void info(const google::protobuf::Descriptor *desc, std::ostream *os) const
An alterative form for getting information for Queues for message types not known at compile-time ("d...
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
const std::string & glog_priority_group()
void add_queue(const google::protobuf::Descriptor *desc, const protobuf::QueuedMessageEntry &queue_cfg)
Alternative method for adding Queues when using Dynamic Protobuf Messages.
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
boost::signals2::signal< void(const google::protobuf::Message &msg)> signal_receive
Signals when a DCCL message is received.
boost::signals2::signal< void(const protobuf::ModemTransmission &ack_msg, const google::protobuf::Message &orig_msg)> signal_ack
Signals when acknowledgment of proper message receipt has been received. This is only sent for queues...
int modem_id()
The current modem ID (MAC address) of this node.
const std::string & glog_out_group()
std::ostream & operator<<(std::ostream &os, const MACManager &mac)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg