Goby3  3.1.4
2024.02.22
dynamic_buffer.h
Go to the documentation of this file.
1 // Copyright 2019-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25 #define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
26 
27 #include <deque>
28 #include <type_traits>
29 #include <unordered_map>
30 #include <vector>
31 
34 #include "goby/exception.h"
35 #include "goby/time/convert.h"
36 #include "goby/time/steady_clock.h"
37 #include "goby/util/debug_logger.h"
38 
39 namespace goby
40 {
41 namespace acomms
42 {
43 namespace protobuf
44 {
45 inline bool operator==(const DynamicBufferConfig& a, const DynamicBufferConfig& b)
46 {
47  return a.SerializeAsString() == b.SerializeAsString();
48 }
49 
50 } // namespace protobuf
51 
53 {
54  public:
55  DynamicBufferNoDataException() : goby::Exception("No queues have data available") {}
57 };
58 
59 template <typename Container> size_t data_size(const Container& c) { return c.size(); }
60 
62 template <typename T, typename Clock = goby::time::SteadyClock> class DynamicSubBuffer
63 {
64  public:
65  using size_type = typename std::deque<T>::size_type;
66 
67  struct Value
68  {
69  typename Clock::time_point push_time;
70  T data;
71  bool operator==(const Value& a) const { return a.push_time == push_time && a.data == data; }
72  };
73 
76  : DynamicSubBuffer(std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg))
77  {
78  }
79 
88  DynamicSubBuffer(const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
89  {
90  update(cfgs);
91  }
92 
94 
96  void update(const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
97  {
99  if (cfgs.empty())
100  throw(goby::Exception("Configuration vector must not be empty for DynamicSubBuffer"));
101 
102  cfg_.Clear();
103 
104  // extract these types from the Protobuf message
105  using ttl_type =
106  std::result_of<decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::type;
107  using value_base_type =
108  std::result_of<decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::type;
109 
110  ttl_type ttl_sum = 0;
111  ttl_type ttl_divisor = 0;
112  value_base_type value_base_sum = 0;
113  value_base_type value_base_divisor = 0;
114 
115  for (const auto& cfg : cfgs)
116  {
117  if (cfg.has_ack_required() && (!cfg_.has_ack_required() || cfg.ack_required()))
119  if (cfg.has_blackout_time() &&
120  (!cfg_.has_blackout_time() || cfg.blackout_time() < cfg_.blackout_time()))
122  if (cfg.has_max_queue() &&
123  (!cfg_.has_max_queue() || cfg.max_queue() > cfg_.max_queue()))
124  cfg_.set_max_queue(cfg.max_queue());
125  if (cfg.has_newest_first() && (!cfg_.has_newest_first() || cfg.newest_first()))
127 
128  if (cfg.has_ttl())
129  {
130  ttl_sum += cfg.ttl();
131  ++ttl_divisor;
132  }
133 
134  if (cfg.has_value_base())
135  {
136  value_base_sum += cfg.value_base();
137  ++value_base_divisor;
138  }
139  }
140 
141  if (ttl_divisor > 0)
142  cfg_.set_ttl(ttl_sum / ttl_divisor);
143  if (value_base_divisor > 0)
144  cfg_.set_value_base(value_base_sum / value_base_divisor);
145  }
146 
148  const goby::acomms::protobuf::DynamicBufferConfig& cfg() const { return cfg_; }
149 
156  Value& top(typename Clock::time_point reference = Clock::now(),
157  typename Clock::duration ack_timeout = std::chrono::microseconds(0))
158  {
159  for (auto& datum_pair : data_)
160  {
161  auto& datum_last_access = datum_pair.first;
162  if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
163  {
164  last_access_ = reference;
165  datum_last_access = last_access_;
166  return datum_pair.second;
167  }
168  }
170  }
171 
173  size_t top_size(typename Clock::time_point reference = Clock::now(),
174  typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
175  {
176  for (const auto& datum_pair : data_)
177  {
178  const auto& datum_last_access = datum_pair.first;
179  if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
180  return data_size(datum_pair.second.data);
181  }
183  }
184 
186  bool
187  all_waiting_for_ack(typename Clock::time_point reference = Clock::now(),
188  typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
189  {
190  for (const auto& datum_pair : data_)
191  {
192  const auto& datum_last_access = datum_pair.first;
193  if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
194  return false;
195  }
196  return true;
197  }
198 
199  enum class ValueResult
200  {
202  EMPTY,
203  IN_BLACKOUT,
206  };
207 
214  std::pair<double, ValueResult>
215  top_value(typename Clock::time_point reference = Clock::now(),
216  size_type max_bytes = std::numeric_limits<size_type>::max(),
217  typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
218  {
219  if (empty())
220  return std::make_pair(-std::numeric_limits<double>::infinity(), ValueResult::EMPTY);
221  else if (in_blackout(reference))
222  return std::make_pair(-std::numeric_limits<double>::infinity(),
224  else if (top_size(reference, ack_timeout) > max_bytes)
225  return std::make_pair(-std::numeric_limits<double>::infinity(),
227  else if (all_waiting_for_ack(reference, ack_timeout))
228  return std::make_pair(-std::numeric_limits<double>::infinity(),
230 
231  using Duration = std::chrono::microseconds;
232 
233  double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
234  double ttl = goby::time::convert_duration<Duration>(cfg_.ttl_with_units()).count();
235  double v_b = cfg_.value_base();
236 
237  double v = v_b * dt / ttl;
238  return std::make_pair(v, ValueResult::VALUE_PROVIDED);
239  }
240 
244  bool in_blackout(typename Clock::time_point reference = Clock::now()) const
245  {
246  auto blackout =
247  goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
248 
249  return reference <= (last_access_ + blackout);
250  }
252  bool empty() const { return data_.empty(); }
253 
255  size_type size() const { return data_.size(); }
256 
258  void pop() { data_.pop_front(); }
259 
265  std::vector<Value> push(const T& t, typename Clock::time_point reference = Clock::now())
266  {
267  std::vector<Value> exceeded;
268 
269  if (cfg_.newest_first())
270  data_.push_front(std::make_pair(zero_point_, Value({reference, t})));
271  else
272  data_.push_back(std::make_pair(zero_point_, Value({reference, t})));
273 
274  while (data_.size() > cfg_.max_queue())
275  {
276  exceeded.push_back(data_.back().second);
277  data_.pop_back();
278  }
279  return exceeded;
280  }
281 
285  std::vector<Value> expire(typename Clock::time_point reference = Clock::now())
286  {
287  std::vector<Value> expired;
288 
289  auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.ttl_with_units());
290  if (cfg_.newest_first())
291  {
292  while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
293  {
294  expired.push_back(data_.back().second);
295  data_.pop_back();
296  }
297  }
298  else
299  {
300  while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
301  {
302  expired.push_back(data_.front().second);
303  data_.pop_front();
304  }
305  }
306  return expired;
307  }
308 
313  bool erase(const Value& value)
314  {
315  // start at the beginning as we are most likely to want to erase elements we recently asked for with top()
316 
317  for (auto it = data_.begin(), end = data_.end(); it != end; ++it)
318  {
319  const auto& datum_pair = it->second;
320  if (datum_pair == value)
321  {
322  data_.erase(it);
323  return true;
324  }
325 
326  // if these are true, we're not going to find it so stop looking
327  if (cfg_.newest_first() && datum_pair.push_time < value.push_time)
328  break;
329  else if (!cfg_.newest_first() && datum_pair.push_time > value.push_time)
330  break;
331  }
332  return false;
333  }
334 
335  private:
337 
338  // pair of last send -> value
339  std::deque<std::pair<typename Clock::time_point, Value>> data_;
340  typename Clock::time_point last_access_{Clock::now()};
341 
342  typename Clock::time_point zero_point_{std::chrono::seconds(0)};
343 };
344 
346 template <typename T, typename Clock = goby::time::SteadyClock> class DynamicBuffer
347 {
348  public:
349  DynamicBuffer() : DynamicBuffer(++count_) {}
351  {
352  glog_priority_group_ = "goby::acomms::buffer::priority::" + std::to_string(id);
353  goby::glog.add_group(glog_priority_group_, util::Colors::yellow);
354  }
356 
357  using subbuffer_id_type = std::string;
360 
361  struct Value
362  {
365  typename Clock::time_point push_time;
366  T data;
367  };
368 
375  void create(modem_id_type dest_id, const subbuffer_id_type& sub_id,
377  {
378  create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
379  }
380 
387  void create(modem_id_type dest_id, const subbuffer_id_type& sub_id,
388  const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
389  {
390  if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
391  throw(goby::Exception("Subbuffer ID: " + sub_id + " already exists."));
392 
393  sub_[dest_id].insert(std::make_pair(sub_id, DynamicSubBuffer<T, Clock>(cfgs)));
394  }
395 
401  void replace(modem_id_type dest_id, const subbuffer_id_type& sub_id,
403  {
404  replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
405  }
406 
412  void replace(modem_id_type dest_id, const subbuffer_id_type& sub_id,
413  const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
414  {
415  remove(dest_id, sub_id);
416  create(dest_id, sub_id, cfgs);
417  }
418 
424  void update(modem_id_type dest_id, const subbuffer_id_type& sub_id,
426  {
427  update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
428  }
429 
435  void update(modem_id_type dest_id, const subbuffer_id_type& sub_id,
436  const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
437  {
438  auto it = sub_[dest_id].find(sub_id);
439  if (it != sub_[dest_id].end())
440  it->second.update(cfgs);
441  else
442  create(dest_id, sub_id, cfgs);
443  }
444 
449  void remove(modem_id_type dest_id, const subbuffer_id_type& sub_id)
450  {
451  sub_[dest_id].erase(sub_id);
452  }
453 
459  std::vector<Value> push(const Value& fvt)
460  {
461  std::vector<Value> exceeded;
462  auto sub_exceeded = sub(fvt.modem_id, fvt.subbuffer_id).push(fvt.data, fvt.push_time);
463  for (const auto& e : sub_exceeded)
464  exceeded.push_back({fvt.modem_id, fvt.subbuffer_id, e.push_time, e.data});
465  return exceeded;
466  }
467 
469  bool empty() const
470  {
471  for (const auto& sub_id_p : sub_)
472  {
473  for (const auto& sub_p : sub_id_p.second)
474  {
475  if (!sub_p.second.empty())
476  return false;
477  }
478  }
479 
480  return true;
481  }
482 
484  size_type size() const
485  {
486  size_type size = 0;
487  for (const auto& sub_id_p : sub_)
488  {
489  for (const auto& sub_p : sub_id_p.second) size += sub_p.second.size();
490  }
491  return size;
492  }
493 
501  size_type max_bytes = std::numeric_limits<size_type>::max(),
502  typename Clock::duration ack_timeout = std::chrono::microseconds(0))
503  {
504  using goby::glog;
505 
506  glog.is_debug1() &&
507  glog << group(glog_priority_group_) << "Starting priority contest (dest: "
508  << (dest_id == goby::acomms::QUERY_DESTINATION_ID ? std::string("?")
509  : std::to_string(dest_id))
510  << ", max_bytes: " << max_bytes << "):" << std::endl;
511 
512  typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
513  winning_sub;
514  double winning_value = -std::numeric_limits<double>::infinity();
515 
516  auto now = Clock::now();
517 
518  if (dest_id != goby::acomms::QUERY_DESTINATION_ID && !sub_.count(dest_id))
520 
521  // if QUERY_DESTINATION_ID, search all subbuffers, otherwise just search the ones that were specified by dest_id
522  for (auto sub_id_it = (dest_id == goby::acomms::QUERY_DESTINATION_ID) ? sub_.begin()
523  : sub_.find(dest_id),
524  sub_id_end = (dest_id == goby::acomms::QUERY_DESTINATION_ID)
525  ? sub_.end()
526  : ++sub_.find(dest_id);
527  sub_id_it != sub_id_end; ++sub_id_it)
528  {
529  for (auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
530  sub_it != sub_end; ++sub_it)
531  {
532  double value;
534  std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
535 
536  std::string value_or_reason;
537  switch (result)
538  {
540  value_or_reason = std::to_string(value);
541  break;
542 
544  value_or_reason = "empty";
545  break;
546 
548  value_or_reason = "blackout";
549  break;
550 
552  value_or_reason = "too large";
553  break;
554 
556  value_or_reason = "ack wait";
557  break;
558  }
559 
560  glog.is_debug1() && glog << group(glog_priority_group_) << "\t" << sub_it->first
561  << " [dest: " << sub_id_it->first
562  << ", n: " << sub_it->second.size()
563  << "]: " << value_or_reason << std::endl;
564 
565  if (value > winning_value)
566  {
567  winning_value = value;
568  winning_sub = sub_it;
569  dest_id = sub_id_it->first;
570  }
571  }
572  }
573 
574  if (winning_value == -std::numeric_limits<double>::infinity())
576 
577  const auto& top_p = winning_sub->second.top(now, ack_timeout);
578  glog.is_debug1() && glog << group(glog_priority_group_) << "Winner: " << winning_sub->first
579  << " (" << data_size(top_p.data) << "B)" << std::endl;
580 
581  return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
582  }
583 
589  bool erase(const Value& value)
590  {
591  return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
592  }
593 
597  std::vector<Value> expire()
598  {
599  auto now = Clock::now();
600  std::vector<Value> expired;
601  for (auto& sub_id_p : sub_)
602  {
603  for (auto& sub_p : sub_id_p.second)
604  {
605  auto sub_expired = sub_p.second.expire(now);
606  for (const auto& e : sub_expired)
607  expired.push_back({sub_id_p.first, sub_p.first, e.push_time, e.data});
608  }
609  }
610  return expired;
611  }
612 
617  {
618  if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
619  throw(goby::Exception("Subbuffer ID: " + sub_id +
620  " does not exist, must call create(...) first."));
621  return sub_.at(dest_id).at(sub_id);
622  }
623 
624  private:
625  // destination -> subbuffer id (group/type) -> subbuffer
626  std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
627 
628  std::string glog_priority_group_;
629  static std::atomic<int> count_;
630 
631 }; // namespace acomms
632 
633 template <typename T, typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);
634 
635 } // namespace acomms
636 } // namespace goby
637 
638 #endif
goby::acomms::DynamicBuffer::empty
bool empty() const
Is this buffer empty (that is, are all subbuffers empty)?
Definition: dynamic_buffer.h:469
goby::acomms::DynamicBufferNoDataException
Definition: dynamic_buffer.h:52
goby::acomms::DynamicSubBuffer::empty
bool empty() const
Returns if this queue is empty.
Definition: dynamic_buffer.h:252
goby::acomms::DynamicSubBuffer::DynamicSubBuffer
DynamicSubBuffer(const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a subbuffer with the given configuration.
Definition: dynamic_buffer.h:75
goby::acomms::DynamicSubBuffer::Value::push_time
Clock::time_point push_time
Definition: dynamic_buffer.h:69
goby::acomms::DynamicBuffer::Value
Definition: dynamic_buffer.h:361
goby::acomms::protobuf::DynamicBufferConfig::set_value_base
void set_value_base(double value)
Definition: buffer.pb.h:416
goby::acomms::DynamicBuffer::push
std::vector< Value > push(const Value &fvt)
Push a new message to the buffer.
Definition: dynamic_buffer.h:459
goby::acomms::protobuf::DynamicBufferConfig::max_queue
::google::protobuf::uint32 max_queue() const
Definition: buffer.pb.h:340
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::acomms::DynamicBuffer::erase
bool erase(const Value &value)
Erase a value.
Definition: dynamic_buffer.h:589
goby::acomms::DynamicSubBuffer::ValueResult::IN_BLACKOUT
@ IN_BLACKOUT
goby::acomms::DynamicSubBuffer::top
Value & top(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the value at the top of the queue that hasn't been sent wihin ack_timeout.
Definition: dynamic_buffer.h:156
goby::acomms::protobuf::DynamicBufferConfig::ttl
double ttl() const
Definition: buffer.pb.h:388
goby::acomms::DynamicBuffer::sub
DynamicSubBuffer< T, Clock > & sub(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Reference a given subbuffer.
Definition: dynamic_buffer.h:616
goby::acomms::protobuf::DynamicBufferConfig::set_ttl
void set_ttl(double value)
Definition: buffer.pb.h:392
goby::acomms::DynamicBuffer::remove
void remove(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Remove an existing subbuffer.
Definition: dynamic_buffer.h:449
goby::acomms::DynamicSubBuffer
Represents a time-dependent priority queue for a single group of messages (e.g. for a single DCCL ID)
Definition: dynamic_buffer.h:62
goby::acomms::DynamicSubBuffer::update
void update(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update the configurations without clearing the buffer.
Definition: dynamic_buffer.h:96
goby::acomms::DynamicBuffer::Value::data
T data
Definition: dynamic_buffer.h:366
goby::acomms::DynamicBuffer::Value::subbuffer_id
subbuffer_id_type subbuffer_id
Definition: dynamic_buffer.h:364
goby::util::e
constexpr T e
Definition: constants.h:35
goby::acomms::DynamicBuffer::create
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a new subbuffer merging the given configuration (See DynamicSubBuffer() for details)
Definition: dynamic_buffer.h:387
goby::acomms::DynamicSubBuffer< T, goby::time::SteadyClock >::ValueResult
ValueResult
Definition: dynamic_buffer.h:199
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::acomms::DynamicSubBuffer::top_size
size_t top_size(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Returns the size (in bytes) of the top of the queue that hasn't been sent within ack_timeout.
Definition: dynamic_buffer.h:173
goby::acomms::DynamicBuffer::Value::push_time
Clock::time_point push_time
Definition: dynamic_buffer.h:365
goby::acomms::protobuf::DynamicBufferConfig
Definition: buffer.pb.h:75
goby::acomms::DynamicSubBuffer::expire
std::vector< Value > expire(typename Clock::time_point reference=Clock::now())
Erase any values that have exceeded their time-to-live.
Definition: dynamic_buffer.h:285
goby::acomms::DynamicBuffer::update
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update an existing subbuffer without removing the messsages (or creates the buffer if it doesn't alre...
Definition: dynamic_buffer.h:435
goby::acomms::protobuf::DynamicBufferConfig::value_base
double value_base() const
Definition: buffer.pb.h:412
goby::acomms::DynamicSubBuffer::push
std::vector< Value > push(const T &t, typename Clock::time_point reference=Clock::now())
Push a value to the queue.
Definition: dynamic_buffer.h:265
goby::acomms::DynamicSubBuffer< T, goby::time::SteadyClock >::size_type
typename std::deque< T >::size_type size_type
Definition: dynamic_buffer.h:65
goby::util::FlexOstream::add_group
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::acomms::DynamicBuffer::Value::modem_id
modem_id_type modem_id
Definition: dynamic_buffer.h:363
goby::acomms::DynamicBuffer::DynamicBuffer
DynamicBuffer()
Definition: dynamic_buffer.h:349
goby::acomms::DynamicSubBuffer::Value::operator==
bool operator==(const Value &a) const
Definition: dynamic_buffer.h:71
goby::acomms::protobuf::DynamicBufferConfig::newest_first
bool newest_first() const
Definition: buffer.pb.h:364
goby::util::FlexOstream::is_debug1
bool is_debug1()
Definition: flex_ostream.h:84
goby::acomms::protobuf::DynamicBufferConfig::ack_required
bool ack_required() const
Definition: buffer.pb.h:292
goby::acomms::DynamicBuffer::expire
std::vector< Value > expire()
Erase any values that have exceeded their time-to-live.
Definition: dynamic_buffer.h:597
goby::acomms::protobuf::DynamicBufferConfig::ttl_with_units
Quantity ttl_with_units() const
Definition: buffer.pb.h:235
goby::acomms::DynamicBuffer::replace
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Replace an existing subbuffer with the given configuration (any messages in the subbuffer will be era...
Definition: dynamic_buffer.h:401
goby::acomms::DynamicSubBuffer::ValueResult::NEXT_MESSAGE_TOO_LARGE
@ NEXT_MESSAGE_TOO_LARGE
goby::acomms::DynamicSubBuffer::ValueResult::ALL_MESSAGES_WAITING_FOR_ACK
@ ALL_MESSAGES_WAITING_FOR_ACK
goby::acomms::DynamicSubBuffer::~DynamicSubBuffer
~DynamicSubBuffer()
Definition: dynamic_buffer.h:93
buffer.pb.h
goby::acomms::protobuf::DynamicBufferConfig::Clear
void Clear() final
goby::acomms::DynamicSubBuffer::Value::data
T data
Definition: dynamic_buffer.h:70
goby::acomms::DynamicBuffer::~DynamicBuffer
~DynamicBuffer()
Definition: dynamic_buffer.h:355
steady_clock.h
goby::acomms::protobuf::DynamicBufferConfig::has_max_queue
bool has_max_queue() const
Definition: buffer.pb.h:327
to_string
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.hpp:24301
goby::acomms::DynamicBuffer::top
Value top(modem_id_type dest_id=goby::acomms::QUERY_DESTINATION_ID, size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the top value in a priority contest between all subbuffers.
Definition: dynamic_buffer.h:500
goby::acomms::protobuf::DynamicBufferConfig::has_newest_first
bool has_newest_first() const
Definition: buffer.pb.h:351
goby::acomms::DynamicBuffer< goby::middleware::protobuf::SerializerTransporterMessage >::size_type
typename DynamicSubBuffer< goby::middleware::protobuf::SerializerTransporterMessage, goby::time::SteadyClock >::size_type size_type
Definition: dynamic_buffer.h:358
goby::acomms::DynamicSubBuffer::DynamicSubBuffer
DynamicSubBuffer(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a subbuffer merging two or more configuration objects.
Definition: dynamic_buffer.h:88
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
goby::acomms::QUERY_DESTINATION_ID
constexpr int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...
Definition: acomms_constants.h:46
goby::acomms::DynamicBufferNoDataException::~DynamicBufferNoDataException
~DynamicBufferNoDataException()
Definition: dynamic_buffer.h:56
goby::acomms::DynamicBuffer::replace
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Replace an existing subbuffer merging the given configuration (See DynamicSubBuffer() for details)
Definition: dynamic_buffer.h:412
goby::acomms::DynamicSubBuffer::size
size_type size() const
Retrieves the size of the queue.
Definition: dynamic_buffer.h:255
convert.h
goby::acomms::data_size
size_t data_size(const Container &c)
Definition: dynamic_buffer.h:59
goby::acomms::DynamicBuffer
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
Definition: dynamic_buffer.h:346
goby::acomms::DynamicBuffer::DynamicBuffer
DynamicBuffer(int id)
Definition: dynamic_buffer.h:350
goby::acomms::protobuf::DynamicBufferConfig::has_ack_required
bool has_ack_required() const
Definition: buffer.pb.h:279
debug_logger.h
goby::acomms::DynamicBuffer::update
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Update an existing subbuffer without removing the messsages.
Definition: dynamic_buffer.h:424
goby::acomms::DynamicSubBuffer::all_waiting_for_ack
bool all_waiting_for_ack(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
returns true if all messages have been sent within ack_timeout of the reference provided and thus non...
Definition: dynamic_buffer.h:187
goby::acomms::protobuf::DynamicBufferConfig::blackout_time_with_units
Quantity blackout_time_with_units() const
Definition: buffer.pb.h:220
goby::acomms::protobuf::DynamicBufferConfig::set_newest_first
void set_newest_first(bool value)
Definition: buffer.pb.h:368
goby::acomms::DynamicSubBuffer::ValueResult::VALUE_PROVIDED
@ VALUE_PROVIDED
goby::acomms::DynamicBuffer< goby::middleware::protobuf::SerializerTransporterMessage >::subbuffer_id_type
std::string subbuffer_id_type
Definition: dynamic_buffer.h:357
goby::acomms::protobuf::DynamicBufferConfig::has_blackout_time
bool has_blackout_time() const
Definition: buffer.pb.h:303
goby::acomms::DynamicBuffer::create
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a new subbuffer with the given configuration.
Definition: dynamic_buffer.h:375
goby::acomms::protobuf::DynamicBufferConfig::set_blackout_time
void set_blackout_time(double value)
Definition: buffer.pb.h:320
goby::acomms::DynamicSubBuffer::cfg
const goby::acomms::protobuf::DynamicBufferConfig & cfg() const
Return the aggregate configuration.
Definition: dynamic_buffer.h:148
goby::acomms::DynamicSubBuffer::pop
void pop()
Pop the value on the top of the queue.
Definition: dynamic_buffer.h:258
goby::acomms::DynamicSubBuffer::Value
Definition: dynamic_buffer.h:67
goby::acomms::DynamicBufferNoDataException::DynamicBufferNoDataException
DynamicBufferNoDataException()
Definition: dynamic_buffer.h:55
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
exception.h
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::acomms::protobuf::DynamicBufferConfig::set_ack_required
void set_ack_required(bool value)
Definition: buffer.pb.h:296
google::protobuf::MessageLite::SerializeAsString
string SerializeAsString() const
goby::acomms::protobuf::DynamicBufferConfig::set_max_queue
void set_max_queue(::google::protobuf::uint32 value)
Definition: buffer.pb.h:344
acomms_constants.h
goby::acomms::protobuf::DynamicBufferConfig::has_value_base
bool has_value_base() const
Definition: buffer.pb.h:399
goby::util::Colors::yellow
@ yellow
Definition: term_color.h:118
goby::acomms::DynamicSubBuffer::in_blackout
bool in_blackout(typename Clock::time_point reference=Clock::now()) const
Returns if buffer is in blackout.
Definition: dynamic_buffer.h:244
goby::acomms::DynamicSubBuffer::top_value
std::pair< double, ValueResult > top_value(typename Clock::time_point reference=Clock::now(), size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Provides the numerical priority value based on this subbuffer's base priority, time-to-live (ttl) and...
Definition: dynamic_buffer.h:215
goby::acomms::protobuf::DynamicBufferConfig::has_ttl
bool has_ttl() const
Definition: buffer.pb.h:375
goby::acomms::protobuf::operator==
bool operator==(const ModemTransmission &a, const ModemTransmission &b)
Definition: mac_manager.h:145
goby::acomms::DynamicSubBuffer::ValueResult::EMPTY
@ EMPTY
goby::acomms::DynamicBuffer::size
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
Definition: dynamic_buffer.h:484
int
goby::acomms::DynamicSubBuffer::erase
bool erase(const Value &value)
Erase a value.
Definition: dynamic_buffer.h:313
goby::acomms::protobuf::DynamicBufferConfig::blackout_time
double blackout_time() const
Definition: buffer.pb.h:316