Goby v2
queue.h
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #ifndef Queue20080605H
24 #define Queue20080605H
25 
26 #include <bitset>
27 #include <deque>
28 #include <iostream>
29 #include <list>
30 #include <map>
31 #include <sstream>
32 #include <string>
33 #include <vector>
34 
35 #include <boost/algorithm/string.hpp>
36 #include <boost/any.hpp>
37 
38 #include "goby/acomms/dccl/dccl.h"
39 #include "goby/common/time.h"
40 #include "goby/util/as.h"
41 
42 #include "goby/acomms/acomms_helpers.h"
43 #include "goby/acomms/protobuf/queue.pb.h"
44 
45 namespace goby
46 {
47 namespace acomms
48 {
49 class QueueManager;
50 
52 {
53  boost::shared_ptr<google::protobuf::Message> dccl_msg;
55 };
56 
57 typedef std::list<QueuedMessage>::iterator messages_it;
58 typedef std::multimap<unsigned, messages_it>::iterator waiting_for_ack_it;
59 
60 class Queue
61 {
62  public:
63  Queue(const google::protobuf::Descriptor* desc, QueueManager* parent,
65 
66  bool push_message(boost::shared_ptr<google::protobuf::Message> dccl_msg);
67  bool push_message(boost::shared_ptr<google::protobuf::Message> dccl_msg,
69 
70  protobuf::QueuedMessageMeta meta_from_msg(const google::protobuf::Message& dccl_msg);
71 
72  boost::any find_queue_field(const std::string& field_name,
73  const google::protobuf::Message& msg);
74 
75  goby::acomms::QueuedMessage give_data(unsigned frame);
76  bool pop_message(unsigned frame);
77  bool pop_message_ack(unsigned frame, boost::shared_ptr<google::protobuf::Message>& removed_msg);
78  void stream_for_pop(const QueuedMessage& queued_msg);
79 
80  std::vector<boost::shared_ptr<google::protobuf::Message> > expire();
81 
82  bool get_priority_values(double* priority, boost::posix_time::ptime* last_send_time,
83  const protobuf::ModemTransmission& request_msg,
84  const std::string& data);
85 
86  // returns true if empty
87  bool clear_ack_queue(unsigned start_frame);
88 
89  void flush();
90 
91  size_t size() const { return messages_.size(); }
92 
93  boost::posix_time::ptime last_send_time() const { return last_send_time_; }
94 
95  boost::posix_time::ptime newest_msg_time() const
96  {
97  return size() ? goby::util::as<boost::posix_time::ptime>(messages_.back().meta.time())
98  : boost::posix_time::ptime();
99  }
100 
101  void info(std::ostream* os) const;
102 
103  std::string name() const { return desc_->full_name(); }
104 
105  void set_cfg(const protobuf::QueuedMessageEntry& cfg)
106  {
107  cfg_ = cfg;
108  process_cfg();
109  }
110  void process_cfg();
111 
112  const protobuf::QueuedMessageEntry& queue_message_options() { return cfg_; }
113 
114  const google::protobuf::Descriptor* descriptor() const { return desc_; }
115 
116  int id() { return goby::acomms::DCCLCodec::get()->id(desc_); }
117 
118  private:
119  waiting_for_ack_it find_ack_value(messages_it it_to_find);
120  messages_it next_message_it();
121 
122  void set_latest_metadata(const google::protobuf::FieldDescriptor* field,
123  const boost::any& field_value, const boost::any& wire_value);
124 
125  private:
126  Queue& operator=(const Queue&);
127  Queue(const Queue&);
128 
129  const google::protobuf::Descriptor* desc_;
130  QueueManager* parent_;
132 
133  // maps role onto FieldDescriptor::full_name() or empty string if static role
134  std::map<protobuf::QueuedMessageEntry::RoleType, std::string> roles_;
135 
136  boost::posix_time::ptime last_send_time_;
137 
138  std::list<QueuedMessage> messages_;
139 
140  // map frame number onto messages list iterator
141  // can have multiples in the same frame now
142  std::multimap<unsigned, messages_it> waiting_for_ack_;
143 
144  protobuf::QueuedMessageMeta static_meta_;
145 };
146 std::ostream& operator<<(std::ostream& os, const Queue& oq);
147 } // namespace acomms
148 
149 } // namespace goby
150 #endif
provides an API to the goby-acomms Queuing Library.
Definition: queue_manager.h:49
static DCCLCodec * get()
DCCLCodec is a singleton class; use this to get a pointer to the class.
Definition: dccl.h:124
The global namespace for the Goby project.