Goby3  3.1.5a
2024.05.23
dynamic_buffer.h
Go to the documentation of this file.
1 // Copyright 2019-2024:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25 #define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
26 
27 #include <deque>
28 #include <type_traits>
29 #include <unordered_map>
30 #include <vector>
31 
34 #include "goby/exception.h"
35 #include "goby/time/convert.h"
36 #include "goby/time/steady_clock.h"
37 #include "goby/util/debug_logger.h"
38 
39 namespace goby
40 {
41 namespace acomms
42 {
43 namespace protobuf
44 {
45 inline bool operator==(const DynamicBufferConfig& a, const DynamicBufferConfig& b)
46 {
47  return a.SerializeAsString() == b.SerializeAsString();
48 }
49 
50 } // namespace protobuf
51 
53 {
54  public:
55  DynamicBufferNoDataException(const std::string& reason)
56  : goby::Exception("No queues have data available: " + reason)
57  {
58  }
60 };
61 
62 template <typename Container> size_t data_size(const Container& c) { return c.size(); }
63 
65 template <typename T, typename Clock = goby::time::SteadyClock> class DynamicSubBuffer
66 {
67  public:
68  using size_type = typename std::deque<T>::size_type;
69 
70  struct Value
71  {
72  typename Clock::time_point push_time;
73  T data;
74  bool operator==(const Value& a) const { return a.push_time == push_time && a.data == data; }
75  };
76 
79  : DynamicSubBuffer(std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg))
80  {
81  }
82 
91  DynamicSubBuffer(const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
92  {
93  update(cfgs);
94  }
95 
97 
99  void update(const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
100  {
102  if (cfgs.empty())
103  throw(goby::Exception("Configuration vector must not be empty for DynamicSubBuffer"));
104 
105  cfg_.Clear();
106 
107  // extract these types from the Protobuf message
108  using ttl_type =
109  std::result_of<decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::type;
110  using value_base_type =
111  std::result_of<decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::type;
112 
113  ttl_type ttl_sum = 0;
114  ttl_type ttl_divisor = 0;
115  value_base_type value_base_sum = 0;
116  value_base_type value_base_divisor = 0;
117 
118  for (const auto& cfg : cfgs)
119  {
120  if (cfg.has_ack_required() && (!cfg_.has_ack_required() || cfg.ack_required()))
122  if (cfg.has_blackout_time() &&
123  (!cfg_.has_blackout_time() || cfg.blackout_time() < cfg_.blackout_time()))
125  if (cfg.has_max_queue() &&
126  (!cfg_.has_max_queue() || cfg.max_queue() > cfg_.max_queue()))
127  cfg_.set_max_queue(cfg.max_queue());
128  if (cfg.has_newest_first() && (!cfg_.has_newest_first() || cfg.newest_first()))
130 
131  if (cfg.has_ttl())
132  {
133  ttl_sum += cfg.ttl();
134  ++ttl_divisor;
135  }
136 
137  if (cfg.has_value_base())
138  {
139  value_base_sum += cfg.value_base();
140  ++value_base_divisor;
141  }
142  }
143 
144  if (ttl_divisor > 0)
145  cfg_.set_ttl(ttl_sum / ttl_divisor);
146  if (value_base_divisor > 0)
147  cfg_.set_value_base(value_base_sum / value_base_divisor);
148 
149  // avoid first access being in blackout
150  last_access_ -=
151  goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
152  }
153 
155  const goby::acomms::protobuf::DynamicBufferConfig& cfg() const { return cfg_; }
156 
163  Value& top(typename Clock::time_point reference = Clock::now(),
164  typename Clock::duration ack_timeout = std::chrono::microseconds(0))
165  {
166  for (auto& datum_pair : data_)
167  {
168  auto& datum_last_access = datum_pair.first;
169  if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
170  {
171  last_access_ = reference;
172  datum_last_access = last_access_;
173  return datum_pair.second;
174  }
175  }
176  throw(DynamicBufferNoDataException("DynamicSubBuffer::top() found no data"));
177  }
178 
180  size_t top_size(typename Clock::time_point reference = Clock::now(),
181  typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
182  {
183  for (const auto& datum_pair : data_)
184  {
185  const auto& datum_last_access = datum_pair.first;
186  if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
187  return data_size(datum_pair.second.data);
188  }
189  throw(DynamicBufferNoDataException("DynamicSubBuffer::top_size() found no data"));
190  }
191 
193  bool
194  all_waiting_for_ack(typename Clock::time_point reference = Clock::now(),
195  typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
196  {
197  for (const auto& datum_pair : data_)
198  {
199  const auto& datum_last_access = datum_pair.first;
200  if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
201  return false;
202  }
203  return true;
204  }
205 
206  enum class ValueResult
207  {
209  EMPTY,
210  IN_BLACKOUT,
213  };
214 
221  std::pair<double, ValueResult>
222  top_value(typename Clock::time_point reference = Clock::now(),
223  size_type max_bytes = std::numeric_limits<size_type>::max(),
224  typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
225  {
226  // this order matters - some checks depend on previous checks
227  if (empty()) // no messages at all
228  return std::make_pair(-std::numeric_limits<double>::infinity(), ValueResult::EMPTY);
229  else if (in_blackout(reference)) // in blackout
230  return std::make_pair(-std::numeric_limits<double>::infinity(),
232  else if (all_waiting_for_ack(reference, ack_timeout)) // all messages waiting for ack
233  return std::make_pair(-std::numeric_limits<double>::infinity(),
235  else if (
236  top_size(reference, ack_timeout) >
237  max_bytes) // check if the top message size is greater than max_bytes - assumes !ALL_MESSAGES_WAITING_FOR_ACK so must go after that check
238  return std::make_pair(-std::numeric_limits<double>::infinity(),
240 
241  using Duration = std::chrono::microseconds;
242 
243  double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
244  double ttl = goby::time::convert_duration<Duration>(cfg_.ttl_with_units()).count();
245  double v_b = cfg_.value_base();
246 
247  double v = v_b * dt / ttl;
248  return std::make_pair(v, ValueResult::VALUE_PROVIDED);
249  }
250 
254  bool in_blackout(typename Clock::time_point reference = Clock::now()) const
255  {
256  auto blackout =
257  goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
258 
259  return reference <= (last_access_ + blackout);
260  }
262  bool empty() const { return data_.empty(); }
263 
265  size_type size() const { return data_.size(); }
266 
268  void pop() { data_.pop_front(); }
269 
275  std::vector<Value> push(const T& t, typename Clock::time_point reference = Clock::now())
276  {
277  std::vector<Value> exceeded;
278 
279  if (cfg_.newest_first())
280  data_.push_front(std::make_pair(zero_point_, Value({reference, t})));
281  else
282  data_.push_back(std::make_pair(zero_point_, Value({reference, t})));
283 
284  while (data_.size() > cfg_.max_queue())
285  {
286  exceeded.push_back(data_.back().second);
287  data_.pop_back();
288  }
289  return exceeded;
290  }
291 
295  std::vector<Value> expire(typename Clock::time_point reference = Clock::now())
296  {
297  std::vector<Value> expired;
298 
299  auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.ttl_with_units());
300  if (cfg_.newest_first())
301  {
302  while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
303  {
304  expired.push_back(data_.back().second);
305  data_.pop_back();
306  }
307  }
308  else
309  {
310  while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
311  {
312  expired.push_back(data_.front().second);
313  data_.pop_front();
314  }
315  }
316  return expired;
317  }
318 
323  bool erase(const Value& value)
324  {
325  // start at the beginning as we are most likely to want to erase elements we recently asked for with top()
326 
327  for (auto it = data_.begin(), end = data_.end(); it != end; ++it)
328  {
329  const auto& datum_pair = it->second;
330  if (datum_pair == value)
331  {
332  data_.erase(it);
333  return true;
334  }
335 
336  // if these are true, we're not going to find it so stop looking
337  if (cfg_.newest_first() && datum_pair.push_time < value.push_time)
338  break;
339  else if (!cfg_.newest_first() && datum_pair.push_time > value.push_time)
340  break;
341  }
342  return false;
343  }
344 
345  private:
347 
348  // pair of last send -> value
349  std::deque<std::pair<typename Clock::time_point, Value>> data_;
350  typename Clock::time_point last_access_{Clock::now()};
351 
352  typename Clock::time_point zero_point_{std::chrono::seconds(0)};
353 };
354 
356 template <typename T, typename Clock = goby::time::SteadyClock> class DynamicBuffer
357 {
358  public:
359  DynamicBuffer() : DynamicBuffer(++count_) {}
361  {
362  glog_priority_group_ = "goby::acomms::buffer::priority::" + std::to_string(id);
363  goby::glog.add_group(glog_priority_group_, util::Colors::yellow);
364  }
366 
367  using subbuffer_id_type = std::string;
370 
371  struct Value
372  {
375  typename Clock::time_point push_time;
376  T data;
377  };
378 
385  void create(modem_id_type dest_id, const subbuffer_id_type& sub_id,
387  {
388  create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
389  }
390 
397  void create(modem_id_type dest_id, const subbuffer_id_type& sub_id,
398  const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
399  {
400  if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
401  throw(goby::Exception("Subbuffer ID: " + sub_id + " already exists."));
402 
403  sub_[dest_id].insert(std::make_pair(sub_id, DynamicSubBuffer<T, Clock>(cfgs)));
404  }
405 
411  void replace(modem_id_type dest_id, const subbuffer_id_type& sub_id,
413  {
414  replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
415  }
416 
422  void replace(modem_id_type dest_id, const subbuffer_id_type& sub_id,
423  const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
424  {
425  remove(dest_id, sub_id);
426  create(dest_id, sub_id, cfgs);
427  }
428 
434  void update(modem_id_type dest_id, const subbuffer_id_type& sub_id,
436  {
437  update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
438  }
439 
445  void update(modem_id_type dest_id, const subbuffer_id_type& sub_id,
446  const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
447  {
448  auto it = sub_[dest_id].find(sub_id);
449  if (it != sub_[dest_id].end())
450  it->second.update(cfgs);
451  else
452  create(dest_id, sub_id, cfgs);
453  }
454 
459  void remove(modem_id_type dest_id, const subbuffer_id_type& sub_id)
460  {
461  sub_[dest_id].erase(sub_id);
462  }
463 
469  std::vector<Value> push(const Value& fvt)
470  {
471  std::vector<Value> exceeded;
472  auto sub_exceeded = sub(fvt.modem_id, fvt.subbuffer_id).push(fvt.data, fvt.push_time);
473  for (const auto& e : sub_exceeded)
474  exceeded.push_back({fvt.modem_id, fvt.subbuffer_id, e.push_time, e.data});
475  return exceeded;
476  }
477 
479  bool empty() const
480  {
481  for (const auto& sub_id_p : sub_)
482  {
483  for (const auto& sub_p : sub_id_p.second)
484  {
485  if (!sub_p.second.empty())
486  return false;
487  }
488  }
489 
490  return true;
491  }
492 
494  size_type size() const
495  {
496  size_type size = 0;
497  for (const auto& sub_id_p : sub_)
498  {
499  for (const auto& sub_p : sub_id_p.second) size += sub_p.second.size();
500  }
501  return size;
502  }
503 
511  size_type max_bytes = std::numeric_limits<size_type>::max(),
512  typename Clock::duration ack_timeout = std::chrono::microseconds(0))
513  {
514  using goby::glog;
515 
516  glog.is_debug1() &&
517  glog << group(glog_priority_group_) << "Starting priority contest (dest: "
518  << (dest_id == goby::acomms::QUERY_DESTINATION_ID ? std::string("?")
519  : std::to_string(dest_id))
520  << ", max_bytes: " << max_bytes << "):" << std::endl;
521 
522  typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
523  winning_sub;
524  double winning_value = -std::numeric_limits<double>::infinity();
525 
526  auto now = Clock::now();
527 
528  if (dest_id != goby::acomms::QUERY_DESTINATION_ID && !sub_.count(dest_id))
530  "DynamicBuffer::top() has no queues with this destination"));
531 
532  // if QUERY_DESTINATION_ID, search all subbuffers, otherwise just search the ones that were specified by dest_id
533  for (auto sub_id_it = (dest_id == goby::acomms::QUERY_DESTINATION_ID) ? sub_.begin()
534  : sub_.find(dest_id),
535  sub_id_end = (dest_id == goby::acomms::QUERY_DESTINATION_ID)
536  ? sub_.end()
537  : ++sub_.find(dest_id);
538  sub_id_it != sub_id_end; ++sub_id_it)
539  {
540  for (auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
541  sub_it != sub_end; ++sub_it)
542  {
543  double value;
545  std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
546 
547  std::string value_or_reason;
548  switch (result)
549  {
551  value_or_reason = std::to_string(value);
552  break;
553 
555  value_or_reason = "empty";
556  break;
557 
559  value_or_reason = "blackout";
560  break;
561 
563  value_or_reason = "too large";
564  break;
565 
567  value_or_reason = "ack wait";
568  break;
569  }
570 
571  glog.is_debug1() && glog << group(glog_priority_group_) << "\t" << sub_it->first
572  << " [dest: " << sub_id_it->first
573  << ", n: " << sub_it->second.size()
574  << "]: " << value_or_reason << std::endl;
575 
576  if (value > winning_value)
577  {
578  winning_value = value;
579  winning_sub = sub_it;
580  dest_id = sub_id_it->first;
581  }
582  }
583  }
584 
585  if (winning_value == -std::numeric_limits<double>::infinity())
587  "DynamicBuffer::top() has no queue with a winning value"));
588 
589  const auto& top_p = winning_sub->second.top(now, ack_timeout);
590  glog.is_debug1() && glog << group(glog_priority_group_) << "Winner: " << winning_sub->first
591  << " (" << data_size(top_p.data) << "B)" << std::endl;
592 
593  return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
594  }
595 
601  bool erase(const Value& value)
602  {
603  return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
604  }
605 
609  std::vector<Value> expire()
610  {
611  auto now = Clock::now();
612  std::vector<Value> expired;
613  for (auto& sub_id_p : sub_)
614  {
615  for (auto& sub_p : sub_id_p.second)
616  {
617  auto sub_expired = sub_p.second.expire(now);
618  for (const auto& e : sub_expired)
619  expired.push_back({sub_id_p.first, sub_p.first, e.push_time, e.data});
620  }
621  }
622  return expired;
623  }
624 
629  {
630  if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
631  throw(goby::Exception("Subbuffer ID: " + sub_id +
632  " does not exist, must call create(...) first."));
633  return sub_.at(dest_id).at(sub_id);
634  }
635 
636  private:
637  // destination -> subbuffer id (group/type) -> subbuffer
638  std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
639 
640  std::string glog_priority_group_;
641  static std::atomic<int> count_;
642 
643 }; // namespace acomms
644 
645 template <typename T, typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);
646 
647 } // namespace acomms
648 } // namespace goby
649 
650 #endif
goby::acomms::DynamicBuffer::empty
bool empty() const
Is this buffer empty (that is, are all subbuffers empty)?
Definition: dynamic_buffer.h:479
goby::acomms::DynamicBufferNoDataException
Definition: dynamic_buffer.h:52
goby::acomms::DynamicSubBuffer::empty
bool empty() const
Returns if this queue is empty.
Definition: dynamic_buffer.h:262
goby::acomms::DynamicSubBuffer::DynamicSubBuffer
DynamicSubBuffer(const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a subbuffer with the given configuration.
Definition: dynamic_buffer.h:78
goby::acomms::DynamicSubBuffer::Value::push_time
Clock::time_point push_time
Definition: dynamic_buffer.h:72
goby::acomms::DynamicBuffer::Value
Definition: dynamic_buffer.h:371
goby::acomms::protobuf::DynamicBufferConfig::set_value_base
void set_value_base(double value)
Definition: buffer.pb.h:416
goby::acomms::DynamicBuffer::push
std::vector< Value > push(const Value &fvt)
Push a new message to the buffer.
Definition: dynamic_buffer.h:469
goby::acomms::protobuf::DynamicBufferConfig::max_queue
::google::protobuf::uint32 max_queue() const
Definition: buffer.pb.h:340
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::acomms::DynamicBuffer::erase
bool erase(const Value &value)
Erase a value.
Definition: dynamic_buffer.h:601
goby::acomms::DynamicSubBuffer::ValueResult::IN_BLACKOUT
@ IN_BLACKOUT
goby::acomms::DynamicSubBuffer::top
Value & top(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the value at the top of the queue that hasn't been sent wihin ack_timeout.
Definition: dynamic_buffer.h:163
goby::acomms::protobuf::DynamicBufferConfig::ttl
double ttl() const
Definition: buffer.pb.h:388
goby::acomms::DynamicBuffer::sub
DynamicSubBuffer< T, Clock > & sub(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Reference a given subbuffer.
Definition: dynamic_buffer.h:628
goby::acomms::protobuf::DynamicBufferConfig::set_ttl
void set_ttl(double value)
Definition: buffer.pb.h:392
goby::acomms::DynamicBuffer::remove
void remove(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Remove an existing subbuffer.
Definition: dynamic_buffer.h:459
goby::acomms::DynamicSubBuffer
Represents a time-dependent priority queue for a single group of messages (e.g. for a single DCCL ID)
Definition: dynamic_buffer.h:65
goby::acomms::DynamicSubBuffer::update
void update(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update the configurations without clearing the buffer.
Definition: dynamic_buffer.h:99
goby::acomms::DynamicBuffer::Value::data
T data
Definition: dynamic_buffer.h:376
goby::acomms::DynamicBuffer::Value::subbuffer_id
subbuffer_id_type subbuffer_id
Definition: dynamic_buffer.h:374
goby::util::e
constexpr T e
Definition: constants.h:35
goby::acomms::DynamicBuffer::create
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a new subbuffer merging the given configuration (See DynamicSubBuffer() for details)
Definition: dynamic_buffer.h:397
goby::acomms::DynamicSubBuffer< T, goby::time::SteadyClock >::ValueResult
ValueResult
Definition: dynamic_buffer.h:206
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::acomms::DynamicSubBuffer::top_size
size_t top_size(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Returns the size (in bytes) of the top of the queue that hasn't been sent within ack_timeout.
Definition: dynamic_buffer.h:180
goby::acomms::DynamicBuffer::Value::push_time
Clock::time_point push_time
Definition: dynamic_buffer.h:375
goby::acomms::protobuf::DynamicBufferConfig
Definition: buffer.pb.h:75
goby::acomms::DynamicSubBuffer::expire
std::vector< Value > expire(typename Clock::time_point reference=Clock::now())
Erase any values that have exceeded their time-to-live.
Definition: dynamic_buffer.h:295
goby::acomms::DynamicBuffer::update
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update an existing subbuffer without removing the messsages (or creates the buffer if it doesn't alre...
Definition: dynamic_buffer.h:445
goby::acomms::protobuf::DynamicBufferConfig::value_base
double value_base() const
Definition: buffer.pb.h:412
goby::acomms::DynamicSubBuffer::push
std::vector< Value > push(const T &t, typename Clock::time_point reference=Clock::now())
Push a value to the queue.
Definition: dynamic_buffer.h:275
goby::acomms::DynamicSubBuffer< T, goby::time::SteadyClock >::size_type
typename std::deque< T >::size_type size_type
Definition: dynamic_buffer.h:68
goby::util::FlexOstream::add_group
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::acomms::DynamicBuffer::Value::modem_id
modem_id_type modem_id
Definition: dynamic_buffer.h:373
goby::acomms::DynamicBuffer::DynamicBuffer
DynamicBuffer()
Definition: dynamic_buffer.h:359
goby::acomms::DynamicSubBuffer::Value::operator==
bool operator==(const Value &a) const
Definition: dynamic_buffer.h:74
goby::acomms::protobuf::DynamicBufferConfig::newest_first
bool newest_first() const
Definition: buffer.pb.h:364
goby::util::FlexOstream::is_debug1
bool is_debug1()
Definition: flex_ostream.h:84
goby::acomms::protobuf::DynamicBufferConfig::ack_required
bool ack_required() const
Definition: buffer.pb.h:292
goby::acomms::DynamicBuffer::expire
std::vector< Value > expire()
Erase any values that have exceeded their time-to-live.
Definition: dynamic_buffer.h:609
goby::acomms::protobuf::DynamicBufferConfig::ttl_with_units
Quantity ttl_with_units() const
Definition: buffer.pb.h:235
goby::acomms::DynamicBuffer::replace
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Replace an existing subbuffer with the given configuration (any messages in the subbuffer will be era...
Definition: dynamic_buffer.h:411
goby::acomms::DynamicSubBuffer::ValueResult::NEXT_MESSAGE_TOO_LARGE
@ NEXT_MESSAGE_TOO_LARGE
goby::acomms::DynamicSubBuffer::ValueResult::ALL_MESSAGES_WAITING_FOR_ACK
@ ALL_MESSAGES_WAITING_FOR_ACK
goby::acomms::DynamicSubBuffer::~DynamicSubBuffer
~DynamicSubBuffer()
Definition: dynamic_buffer.h:96
buffer.pb.h
goby::acomms::protobuf::DynamicBufferConfig::Clear
void Clear() final
goby::acomms::DynamicSubBuffer::Value::data
T data
Definition: dynamic_buffer.h:73
goby::acomms::DynamicBuffer::~DynamicBuffer
~DynamicBuffer()
Definition: dynamic_buffer.h:365
steady_clock.h
goby::acomms::protobuf::DynamicBufferConfig::has_max_queue
bool has_max_queue() const
Definition: buffer.pb.h:327
to_string
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.hpp:24301
goby::acomms::DynamicBuffer::top
Value top(modem_id_type dest_id=goby::acomms::QUERY_DESTINATION_ID, size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the top value in a priority contest between all subbuffers.
Definition: dynamic_buffer.h:510
goby::acomms::protobuf::DynamicBufferConfig::has_newest_first
bool has_newest_first() const
Definition: buffer.pb.h:351
goby::acomms::DynamicBuffer< goby::middleware::protobuf::SerializerTransporterMessage >::size_type
typename DynamicSubBuffer< goby::middleware::protobuf::SerializerTransporterMessage, goby::time::SteadyClock >::size_type size_type
Definition: dynamic_buffer.h:368
goby::acomms::DynamicSubBuffer::DynamicSubBuffer
DynamicSubBuffer(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a subbuffer merging two or more configuration objects.
Definition: dynamic_buffer.h:91
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
goby::acomms::QUERY_DESTINATION_ID
constexpr int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...
Definition: acomms_constants.h:46
goby::acomms::DynamicBufferNoDataException::~DynamicBufferNoDataException
~DynamicBufferNoDataException()
Definition: dynamic_buffer.h:59
goby::acomms::DynamicBuffer::replace
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Replace an existing subbuffer merging the given configuration (See DynamicSubBuffer() for details)
Definition: dynamic_buffer.h:422
goby::acomms::DynamicSubBuffer::size
size_type size() const
Retrieves the size of the queue.
Definition: dynamic_buffer.h:265
convert.h
goby::acomms::data_size
size_t data_size(const Container &c)
Definition: dynamic_buffer.h:62
goby::acomms::DynamicBuffer
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
Definition: dynamic_buffer.h:356
goby::acomms::DynamicBuffer::DynamicBuffer
DynamicBuffer(int id)
Definition: dynamic_buffer.h:360
goby::acomms::protobuf::DynamicBufferConfig::has_ack_required
bool has_ack_required() const
Definition: buffer.pb.h:279
debug_logger.h
goby::acomms::DynamicBuffer::update
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Update an existing subbuffer without removing the messsages.
Definition: dynamic_buffer.h:434
goby::acomms::DynamicSubBuffer::all_waiting_for_ack
bool all_waiting_for_ack(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
returns true if all messages have been sent within ack_timeout of the reference provided and thus non...
Definition: dynamic_buffer.h:194
goby::acomms::protobuf::DynamicBufferConfig::blackout_time_with_units
Quantity blackout_time_with_units() const
Definition: buffer.pb.h:220
goby::acomms::protobuf::DynamicBufferConfig::set_newest_first
void set_newest_first(bool value)
Definition: buffer.pb.h:368
goby::acomms::DynamicSubBuffer::ValueResult::VALUE_PROVIDED
@ VALUE_PROVIDED
goby::acomms::DynamicBuffer< goby::middleware::protobuf::SerializerTransporterMessage >::subbuffer_id_type
std::string subbuffer_id_type
Definition: dynamic_buffer.h:367
goby::acomms::protobuf::DynamicBufferConfig::has_blackout_time
bool has_blackout_time() const
Definition: buffer.pb.h:303
goby::acomms::DynamicBuffer::create
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a new subbuffer with the given configuration.
Definition: dynamic_buffer.h:385
goby::acomms::protobuf::DynamicBufferConfig::set_blackout_time
void set_blackout_time(double value)
Definition: buffer.pb.h:320
goby::acomms::DynamicSubBuffer::cfg
const goby::acomms::protobuf::DynamicBufferConfig & cfg() const
Return the aggregate configuration.
Definition: dynamic_buffer.h:155
goby::acomms::DynamicBufferNoDataException::DynamicBufferNoDataException
DynamicBufferNoDataException(const std::string &reason)
Definition: dynamic_buffer.h:55
goby::acomms::DynamicSubBuffer::pop
void pop()
Pop the value on the top of the queue.
Definition: dynamic_buffer.h:268
goby::acomms::DynamicSubBuffer::Value
Definition: dynamic_buffer.h:70
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
exception.h
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::acomms::protobuf::DynamicBufferConfig::set_ack_required
void set_ack_required(bool value)
Definition: buffer.pb.h:296
google::protobuf::MessageLite::SerializeAsString
string SerializeAsString() const
goby::acomms::protobuf::DynamicBufferConfig::set_max_queue
void set_max_queue(::google::protobuf::uint32 value)
Definition: buffer.pb.h:344
acomms_constants.h
goby::acomms::protobuf::DynamicBufferConfig::has_value_base
bool has_value_base() const
Definition: buffer.pb.h:399
goby::util::Colors::yellow
@ yellow
Definition: term_color.h:118
goby::acomms::DynamicSubBuffer::in_blackout
bool in_blackout(typename Clock::time_point reference=Clock::now()) const
Returns if buffer is in blackout.
Definition: dynamic_buffer.h:254
goby::acomms::DynamicSubBuffer::top_value
std::pair< double, ValueResult > top_value(typename Clock::time_point reference=Clock::now(), size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Provides the numerical priority value based on this subbuffer's base priority, time-to-live (ttl) and...
Definition: dynamic_buffer.h:222
goby::acomms::protobuf::DynamicBufferConfig::has_ttl
bool has_ttl() const
Definition: buffer.pb.h:375
goby::acomms::protobuf::operator==
bool operator==(const ModemTransmission &a, const ModemTransmission &b)
Definition: mac_manager.h:145
goby::acomms::DynamicSubBuffer::ValueResult::EMPTY
@ EMPTY
goby::acomms::DynamicBuffer::size
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
Definition: dynamic_buffer.h:494
int
goby::acomms::DynamicSubBuffer::erase
bool erase(const Value &value)
Erase a value.
Definition: dynamic_buffer.h:323
goby::acomms::protobuf::DynamicBufferConfig::blackout_time
double blackout_time() const
Definition: buffer.pb.h:316