Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
dynamic_buffer.h
Go to the documentation of this file.
1// Copyright 2019-2024:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25#define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
26
27#include <deque>
28#include <type_traits>
29#include <unordered_map>
30#include <vector>
31
34#include "goby/exception.h"
35#include "goby/time/convert.h"
38
39namespace goby
40{
41namespace acomms
42{
43namespace protobuf
44{
45inline bool operator==(const DynamicBufferConfig& a, const DynamicBufferConfig& b)
46{
47 return a.SerializeAsString() == b.SerializeAsString();
48}
49
50} // namespace protobuf
51
53{
54 public:
55 DynamicBufferNoDataException(const std::string& reason)
56 : goby::Exception("No queues have data available: " + reason)
57 {
58 }
60};
61
62template <typename Container> size_t data_size(const Container& c) { return c.size(); }
63
65template <typename T, typename Clock = goby::time::SteadyClock> class DynamicSubBuffer
66{
67 public:
68 using size_type = typename std::deque<T>::size_type;
69
70 struct Value
71 {
72 typename Clock::time_point push_time;
74 bool operator==(const Value& a) const { return a.push_time == push_time && a.data == data; }
75 };
76
79 : DynamicSubBuffer(std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg))
80 {
81 }
82
91 DynamicSubBuffer(const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
92 {
93 update(cfgs);
94 }
95
97
99 void update(const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
100 {
102 if (cfgs.empty())
103 throw(goby::Exception("Configuration vector must not be empty for DynamicSubBuffer"));
104
105 cfg_.Clear();
106
107 // extract these types from the Protobuf message
108 using ttl_type =
109 std::result_of<decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::type;
110 using value_base_type =
111 std::result_of<decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::type;
112
113 ttl_type ttl_sum = 0;
114 ttl_type ttl_divisor = 0;
115 value_base_type value_base_sum = 0;
116 value_base_type value_base_divisor = 0;
117
118 for (const auto& cfg : cfgs)
119 {
120 if (cfg.has_ack_required() && (!cfg_.has_ack_required() || cfg.ack_required()))
122 if (cfg.has_blackout_time() &&
123 (!cfg_.has_blackout_time() || cfg.blackout_time() < cfg_.blackout_time()))
125 if (cfg.has_max_queue() &&
126 (!cfg_.has_max_queue() || cfg.max_queue() > cfg_.max_queue()))
128 if (cfg.has_newest_first() && (!cfg_.has_newest_first() || cfg.newest_first()))
130
131 if (cfg.has_ttl())
132 {
133 ttl_sum += cfg.ttl();
134 ++ttl_divisor;
135 }
136
137 if (cfg.has_value_base())
138 {
139 value_base_sum += cfg.value_base();
140 ++value_base_divisor;
141 }
142 }
143
144 if (ttl_divisor > 0)
145 cfg_.set_ttl(ttl_sum / ttl_divisor);
146 if (value_base_divisor > 0)
147 cfg_.set_value_base(value_base_sum / value_base_divisor);
148
149 // avoid first access being in blackout
150 last_access_ -=
151 goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
152 }
153
155 const goby::acomms::protobuf::DynamicBufferConfig& cfg() const { return cfg_; }
156
163 Value& top(typename Clock::time_point reference = Clock::now(),
164 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
165 {
166 for (auto& datum_pair : data_)
167 {
168 auto& datum_last_access = datum_pair.first;
169 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
170 {
171 last_access_ = reference;
172 datum_last_access = last_access_;
173 return datum_pair.second;
174 }
175 }
176 throw(DynamicBufferNoDataException("DynamicSubBuffer::top() found no data"));
177 }
178
180 size_t top_size(typename Clock::time_point reference = Clock::now(),
181 typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
182 {
183 for (const auto& datum_pair : data_)
184 {
185 const auto& datum_last_access = datum_pair.first;
186 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
187 return data_size(datum_pair.second.data);
188 }
189 throw(DynamicBufferNoDataException("DynamicSubBuffer::top_size() found no data"));
190 }
191
193 bool
194 all_waiting_for_ack(typename Clock::time_point reference = Clock::now(),
195 typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
196 {
197 for (const auto& datum_pair : data_)
198 {
199 const auto& datum_last_access = datum_pair.first;
200 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
201 return false;
202 }
203 return true;
204 }
205
214
221 std::pair<double, ValueResult>
222 top_value(typename Clock::time_point reference = Clock::now(),
223 size_type max_bytes = std::numeric_limits<size_type>::max(),
224 typename Clock::duration ack_timeout = std::chrono::microseconds(0)) const
225 {
226 // this order matters - some checks depend on previous checks
227 if (empty()) // no messages at all
228 return std::make_pair(-std::numeric_limits<double>::infinity(), ValueResult::EMPTY);
229 else if (in_blackout(reference)) // in blackout
230 return std::make_pair(-std::numeric_limits<double>::infinity(),
232 else if (all_waiting_for_ack(reference, ack_timeout)) // all messages waiting for ack
233 return std::make_pair(-std::numeric_limits<double>::infinity(),
235 else if (
236 top_size(reference, ack_timeout) >
237 max_bytes) // check if the top message size is greater than max_bytes - assumes !ALL_MESSAGES_WAITING_FOR_ACK so must go after that check
238 return std::make_pair(-std::numeric_limits<double>::infinity(),
240
241 using Duration = std::chrono::microseconds;
242
243 double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
244 double ttl = goby::time::convert_duration<Duration>(cfg_.ttl_with_units()).count();
245 double v_b = cfg_.value_base();
246
247 double v = v_b * dt / ttl;
248 return std::make_pair(v, ValueResult::VALUE_PROVIDED);
249 }
250
254 bool in_blackout(typename Clock::time_point reference = Clock::now()) const
255 {
256 auto blackout =
257 goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
258
259 return reference <= (last_access_ + blackout);
260 }
262 bool empty() const { return data_.empty(); }
263
265 size_type size() const { return data_.size(); }
266
268 void pop() { data_.pop_front(); }
269
275 std::vector<Value> push(const T& t, typename Clock::time_point reference = Clock::now())
276 {
277 std::vector<Value> exceeded;
278
279 if (cfg_.newest_first())
280 data_.push_front(std::make_pair(zero_point_, Value({reference, t})));
281 else
282 data_.push_back(std::make_pair(zero_point_, Value({reference, t})));
283
284 while (data_.size() > cfg_.max_queue())
285 {
286 exceeded.push_back(data_.back().second);
287 data_.pop_back();
288 }
289 return exceeded;
290 }
291
295 std::vector<Value> expire(typename Clock::time_point reference = Clock::now())
296 {
297 std::vector<Value> expired;
298
299 auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.ttl_with_units());
300 if (cfg_.newest_first())
301 {
302 while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
303 {
304 expired.push_back(data_.back().second);
305 data_.pop_back();
306 }
307 }
308 else
309 {
310 while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
311 {
312 expired.push_back(data_.front().second);
313 data_.pop_front();
314 }
315 }
316 return expired;
317 }
318
323 bool erase(const Value& value)
324 {
325 // start at the beginning as we are most likely to want to erase elements we recently asked for with top()
326
327 for (auto it = data_.begin(), end = data_.end(); it != end; ++it)
328 {
329 const auto& datum_pair = it->second;
330 if (datum_pair == value)
331 {
332 data_.erase(it);
333 return true;
334 }
335
336 // if these are true, we're not going to find it so stop looking
337 if (cfg_.newest_first() && datum_pair.push_time < value.push_time)
338 break;
339 else if (!cfg_.newest_first() && datum_pair.push_time > value.push_time)
340 break;
341 }
342 return false;
343 }
344
345 private:
347
348 // pair of last send -> value
349 std::deque<std::pair<typename Clock::time_point, Value>> data_;
350 typename Clock::time_point last_access_{Clock::now()};
351
352 typename Clock::time_point zero_point_{std::chrono::seconds(0)};
353};
354
356template <typename T, typename Clock = goby::time::SteadyClock> class DynamicBuffer
357{
358 public:
361 {
362 glog_priority_group_ = "goby::acomms::buffer::priority::" + std::to_string(id);
363 goby::glog.add_group(glog_priority_group_, util::Colors::yellow);
364 }
366
367 using subbuffer_id_type = std::string;
369 using modem_id_type = int;
370
371 struct Value
372 {
375 typename Clock::time_point push_time;
377 };
378
385 void create(modem_id_type dest_id, const subbuffer_id_type& sub_id,
387 {
388 create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
389 }
390
397 void create(modem_id_type dest_id, const subbuffer_id_type& sub_id,
398 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
399 {
400 if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
401 throw(goby::Exception("Subbuffer ID: " + sub_id + " already exists."));
402
403 sub_[dest_id].insert(std::make_pair(sub_id, DynamicSubBuffer<T, Clock>(cfgs)));
404 }
405
411 void replace(modem_id_type dest_id, const subbuffer_id_type& sub_id,
413 {
414 replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
415 }
416
422 void replace(modem_id_type dest_id, const subbuffer_id_type& sub_id,
423 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
424 {
425 remove(dest_id, sub_id);
426 create(dest_id, sub_id, cfgs);
427 }
428
434 void update(modem_id_type dest_id, const subbuffer_id_type& sub_id,
436 {
437 update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
438 }
439
445 void update(modem_id_type dest_id, const subbuffer_id_type& sub_id,
446 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
447 {
448 auto it = sub_[dest_id].find(sub_id);
449 if (it != sub_[dest_id].end())
450 it->second.update(cfgs);
451 else
452 create(dest_id, sub_id, cfgs);
453 }
454
459 void remove(modem_id_type dest_id, const subbuffer_id_type& sub_id)
460 {
461 sub_[dest_id].erase(sub_id);
462 }
463
469 std::vector<Value> push(const Value& fvt)
470 {
471 std::vector<Value> exceeded;
472 auto sub_exceeded = sub(fvt.modem_id, fvt.subbuffer_id).push(fvt.data, fvt.push_time);
473 for (const auto& e : sub_exceeded)
474 exceeded.push_back({fvt.modem_id, fvt.subbuffer_id, e.push_time, e.data});
475 return exceeded;
476 }
477
479 bool empty() const
480 {
481 for (const auto& sub_id_p : sub_)
482 {
483 for (const auto& sub_p : sub_id_p.second)
484 {
485 if (!sub_p.second.empty())
486 return false;
487 }
488 }
489
490 return true;
491 }
492
495 {
496 size_type size = 0;
497 for (const auto& sub_id_p : sub_)
498 {
499 for (const auto& sub_p : sub_id_p.second) size += sub_p.second.size();
500 }
501 return size;
502 }
503
511 size_type max_bytes = std::numeric_limits<size_type>::max(),
512 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
513 {
514 using goby::glog;
515
516 glog.is_debug1() &&
517 glog << group(glog_priority_group_) << "Starting priority contest (dest: "
518 << (dest_id == goby::acomms::QUERY_DESTINATION_ID ? std::string("?")
519 : std::to_string(dest_id))
520 << ", max_bytes: " << max_bytes << "):" << std::endl;
521
522 typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
523 winning_sub;
524 double winning_value = -std::numeric_limits<double>::infinity();
525
526 auto now = Clock::now();
527
528 if (dest_id != goby::acomms::QUERY_DESTINATION_ID && !sub_.count(dest_id))
530 "DynamicBuffer::top() has no queues with this destination"));
531
532 // if QUERY_DESTINATION_ID, search all subbuffers, otherwise just search the ones that were specified by dest_id
533 for (auto sub_id_it = (dest_id == goby::acomms::QUERY_DESTINATION_ID) ? sub_.begin()
534 : sub_.find(dest_id),
535 sub_id_end = (dest_id == goby::acomms::QUERY_DESTINATION_ID)
536 ? sub_.end()
537 : ++sub_.find(dest_id);
538 sub_id_it != sub_id_end; ++sub_id_it)
539 {
540 for (auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
541 sub_it != sub_end; ++sub_it)
542 {
543 double value;
545 std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
546
547 std::string value_or_reason;
548 switch (result)
549 {
551 value_or_reason = std::to_string(value);
552 break;
553
555 value_or_reason = "empty";
556 break;
557
559 value_or_reason = "blackout";
560 break;
561
563 value_or_reason = "too large";
564 break;
565
567 value_or_reason = "ack wait";
568 break;
569 }
570
571 glog.is_debug1() && glog << group(glog_priority_group_) << "\t" << sub_it->first
572 << " [dest: " << sub_id_it->first
573 << ", n: " << sub_it->second.size()
574 << "]: " << value_or_reason << std::endl;
575
576 if (value > winning_value)
577 {
578 winning_value = value;
579 winning_sub = sub_it;
580 dest_id = sub_id_it->first;
581 }
582 }
583 }
584
585 if (winning_value == -std::numeric_limits<double>::infinity())
587 "DynamicBuffer::top() has no queue with a winning value"));
588
589 const auto& top_p = winning_sub->second.top(now, ack_timeout);
590 glog.is_debug1() && glog << group(glog_priority_group_) << "Winner: " << winning_sub->first
591 << " (" << data_size(top_p.data) << "B)" << std::endl;
592
593 return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
594 }
595
601 bool erase(const Value& value)
602 {
603 return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
604 }
605
609 std::vector<Value> expire()
610 {
611 auto now = Clock::now();
612 std::vector<Value> expired;
613 for (auto& sub_id_p : sub_)
614 {
615 for (auto& sub_p : sub_id_p.second)
616 {
617 auto sub_expired = sub_p.second.expire(now);
618 for (const auto& e : sub_expired)
619 expired.push_back({sub_id_p.first, sub_p.first, e.push_time, e.data});
620 }
621 }
622 return expired;
623 }
624
629 {
630 if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
631 throw(goby::Exception("Subbuffer ID: " + sub_id +
632 " does not exist, must call create(...) first."));
633 return sub_.at(dest_id).at(sub_id);
634 }
635
636 private:
637 // destination -> subbuffer id (group/type) -> subbuffer
638 std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
639
640 std::string glog_priority_group_;
641 static std::atomic<int> count_;
642
643}; // namespace acomms
644
645template <typename T, typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);
646
647} // namespace acomms
648} // namespace goby
649
650#endif
simple exception class for goby applications
Definition exception.h:35
DynamicBufferNoDataException(const std::string &reason)
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
bool empty() const
Is this buffer empty (that is, are all subbuffers empty)?
std::vector< Value > push(const Value &fvt)
Push a new message to the buffer.
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Replace an existing subbuffer merging the given configuration (See DynamicSubBuffer() for details)
std::vector< Value > expire()
Erase any values that have exceeded their time-to-live.
typename DynamicSubBuffer< T, Clock >::size_type size_type
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update an existing subbuffer without removing the messsages (or creates the buffer if it doesn't alre...
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Update an existing subbuffer without removing the messsages.
bool erase(const Value &value)
Erase a value.
DynamicSubBuffer< T, Clock > & sub(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Reference a given subbuffer.
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Replace an existing subbuffer with the given configuration (any messages in the subbuffer will be era...
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a new subbuffer merging the given configuration (See DynamicSubBuffer() for details)
Value top(modem_id_type dest_id=goby::acomms::QUERY_DESTINATION_ID, size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the top value in a priority contest between all subbuffers.
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a new subbuffer with the given configuration.
void remove(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Remove an existing subbuffer.
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
Represents a time-dependent priority queue for a single group of messages (e.g. for a single DCCL ID)
DynamicSubBuffer(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a subbuffer merging two or more configuration objects.
std::vector< Value > expire(typename Clock::time_point reference=Clock::now())
Erase any values that have exceeded their time-to-live.
bool erase(const Value &value)
Erase a value.
const goby::acomms::protobuf::DynamicBufferConfig & cfg() const
Return the aggregate configuration.
bool in_blackout(typename Clock::time_point reference=Clock::now()) const
Returns if buffer is in blackout.
typename std::deque< T >::size_type size_type
bool empty() const
Returns if this queue is empty.
void update(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update the configurations without clearing the buffer.
bool all_waiting_for_ack(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
returns true if all messages have been sent within ack_timeout of the reference provided and thus non...
void pop()
Pop the value on the top of the queue.
std::vector< Value > push(const T &t, typename Clock::time_point reference=Clock::now())
Push a value to the queue.
Value & top(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the value at the top of the queue that hasn't been sent wihin ack_timeout.
size_type size() const
Retrieves the size of the queue.
size_t top_size(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Returns the size (in bytes) of the top of the queue that hasn't been sent within ack_timeout.
DynamicSubBuffer(const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a subbuffer with the given configuration.
std::pair< double, ValueResult > top_value(typename Clock::time_point reference=Clock::now(), size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Provides the numerical priority value based on this subbuffer's base priority, time-to-live (ttl) and...
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::util::logger::GroupSetter group(std::string n)
size_t data_size(const Container &c)
constexpr int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
STL namespace.
bool operator==(const Value &a) const