24#ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25#define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
29#include <unordered_map>
47 return a.SerializeAsString() == b.SerializeAsString();
62template <
typename Container>
size_t data_size(
const Container& c) {
return c.size(); }
68 using size_type =
typename std::deque<T>::size_type;
99 void update(
const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
103 throw(
goby::Exception(
"Configuration vector must not be empty for DynamicSubBuffer"));
109 std::result_of<
decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::type;
110 using value_base_type =
111 std::result_of<
decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::type;
113 ttl_type ttl_sum = 0;
114 ttl_type ttl_divisor = 0;
115 value_base_type value_base_sum = 0;
116 value_base_type value_base_divisor = 0;
118 for (
const auto&
cfg : cfgs)
140 ++value_base_divisor;
145 cfg_.
set_ttl(ttl_sum / ttl_divisor);
146 if (value_base_divisor > 0)
163 Value&
top(
typename Clock::time_point reference = Clock::now(),
164 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
166 for (
auto& datum_pair : data_)
168 auto& datum_last_access = datum_pair.first;
169 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
171 last_access_ = reference;
172 datum_last_access = last_access_;
173 return datum_pair.second;
180 size_t top_size(
typename Clock::time_point reference = Clock::now(),
181 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
183 for (
const auto& datum_pair : data_)
185 const auto& datum_last_access = datum_pair.first;
186 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
187 return data_size(datum_pair.second.data);
195 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
197 for (
const auto& datum_pair : data_)
199 const auto& datum_last_access = datum_pair.first;
200 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
221 std::pair<double, ValueResult>
222 top_value(
typename Clock::time_point reference = Clock::now(),
223 size_type max_bytes = std::numeric_limits<size_type>::max(),
224 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
230 return std::make_pair(-std::numeric_limits<double>::infinity(),
233 return std::make_pair(-std::numeric_limits<double>::infinity(),
238 return std::make_pair(-std::numeric_limits<double>::infinity(),
241 using Duration = std::chrono::microseconds;
243 double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
244 double ttl = goby::time::convert_duration<Duration>(cfg_.
ttl_with_units()).count();
247 double v = v_b * dt / ttl;
254 bool in_blackout(
typename Clock::time_point reference = Clock::now())
const
259 return reference <= (last_access_ + blackout);
262 bool empty()
const {
return data_.empty(); }
268 void pop() { data_.pop_front(); }
275 std::vector<Value>
push(
const T& t,
typename Clock::time_point reference = Clock::now())
277 std::vector<Value> exceeded;
280 data_.push_front(std::make_pair(zero_point_,
Value({reference, t})));
282 data_.push_back(std::make_pair(zero_point_,
Value({reference, t})));
286 exceeded.push_back(data_.back().second);
295 std::vector<Value>
expire(
typename Clock::time_point reference = Clock::now())
297 std::vector<Value> expired;
299 auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.
ttl_with_units());
302 while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
304 expired.push_back(data_.back().second);
310 while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
312 expired.push_back(data_.front().second);
327 for (
auto it = data_.begin(), end = data_.end(); it != end; ++it)
329 const auto& datum_pair = it->second;
330 if (datum_pair == value)
337 if (cfg_.
newest_first() && datum_pair.push_time < value.push_time)
339 else if (!cfg_.
newest_first() && datum_pair.push_time > value.push_time)
349 std::deque<std::pair<typename Clock::time_point, Value>> data_;
350 typename Clock::time_point last_access_{Clock::now()};
352 typename Clock::time_point zero_point_{std::chrono::seconds(0)};
356template <
typename T,
typename Clock = goby::time::SteadyClock>
class DynamicBuffer
362 glog_priority_group_ =
"goby::acomms::buffer::priority::" + std::to_string(
id);
388 create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
398 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
400 if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
401 throw(
goby::Exception(
"Subbuffer ID: " + sub_id +
" already exists."));
414 replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
423 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
426 create(dest_id, sub_id, cfgs);
437 update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
446 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
448 auto it = sub_[dest_id].find(sub_id);
449 if (it != sub_[dest_id].end())
450 it->second.update(cfgs);
452 create(dest_id, sub_id, cfgs);
461 sub_[dest_id].erase(sub_id);
471 std::vector<Value> exceeded;
473 for (
const auto& e : sub_exceeded)
481 for (
const auto& sub_id_p : sub_)
483 for (
const auto& sub_p : sub_id_p.second)
485 if (!sub_p.second.empty())
497 for (
const auto& sub_id_p : sub_)
499 for (
const auto& sub_p : sub_id_p.second) size += sub_p.second.size();
511 size_type max_bytes = std::numeric_limits<size_type>::max(),
512 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
517 glog <<
group(glog_priority_group_) <<
"Starting priority contest (dest: "
519 : std::to_string(dest_id))
520 <<
", max_bytes: " << max_bytes <<
"):" << std::endl;
522 typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
524 double winning_value = -std::numeric_limits<double>::infinity();
526 auto now = Clock::now();
530 "DynamicBuffer::top() has no queues with this destination"));
534 : sub_.find(dest_id),
537 : ++sub_.find(dest_id);
538 sub_id_it != sub_id_end; ++sub_id_it)
540 for (
auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
541 sub_it != sub_end; ++sub_it)
545 std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
547 std::string value_or_reason;
551 value_or_reason = std::to_string(value);
555 value_or_reason =
"empty";
559 value_or_reason =
"blackout";
563 value_or_reason =
"too large";
567 value_or_reason =
"ack wait";
572 <<
" [dest: " << sub_id_it->first
573 <<
", n: " << sub_it->second.size()
574 <<
"]: " << value_or_reason << std::endl;
576 if (value > winning_value)
578 winning_value = value;
579 winning_sub = sub_it;
580 dest_id = sub_id_it->first;
585 if (winning_value == -std::numeric_limits<double>::infinity())
587 "DynamicBuffer::top() has no queue with a winning value"));
589 const auto& top_p = winning_sub->second.top(now, ack_timeout);
591 <<
" (" <<
data_size(top_p.data) <<
"B)" << std::endl;
593 return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
603 return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
611 auto now = Clock::now();
612 std::vector<Value> expired;
613 for (
auto& sub_id_p : sub_)
615 for (
auto& sub_p : sub_id_p.second)
617 auto sub_expired = sub_p.second.expire(now);
618 for (
const auto& e : sub_expired)
619 expired.push_back({sub_id_p.first, sub_p.first, e.push_time, e.data});
630 if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
632 " does not exist, must call create(...) first."));
633 return sub_.at(dest_id).at(sub_id);
638 std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
640 std::string glog_priority_group_;
641 static std::atomic<int> count_;
645template <
typename T,
typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);
simple exception class for goby applications
~DynamicBufferNoDataException()
DynamicBufferNoDataException(const std::string &reason)
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
bool empty() const
Is this buffer empty (that is, are all subbuffers empty)?
std::vector< Value > push(const Value &fvt)
Push a new message to the buffer.
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Replace an existing subbuffer merging the given configuration (See DynamicSubBuffer() for details)
std::vector< Value > expire()
Erase any values that have exceeded their time-to-live.
typename DynamicSubBuffer< T, Clock >::size_type size_type
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update an existing subbuffer without removing the messsages (or creates the buffer if it doesn't alre...
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Update an existing subbuffer without removing the messsages.
bool erase(const Value &value)
Erase a value.
DynamicSubBuffer< T, Clock > & sub(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Reference a given subbuffer.
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Replace an existing subbuffer with the given configuration (any messages in the subbuffer will be era...
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a new subbuffer merging the given configuration (See DynamicSubBuffer() for details)
Value top(modem_id_type dest_id=goby::acomms::QUERY_DESTINATION_ID, size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the top value in a priority contest between all subbuffers.
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a new subbuffer with the given configuration.
void remove(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Remove an existing subbuffer.
std::string subbuffer_id_type
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
Represents a time-dependent priority queue for a single group of messages (e.g. for a single DCCL ID)
@ ALL_MESSAGES_WAITING_FOR_ACK
DynamicSubBuffer(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a subbuffer merging two or more configuration objects.
std::vector< Value > expire(typename Clock::time_point reference=Clock::now())
Erase any values that have exceeded their time-to-live.
bool erase(const Value &value)
Erase a value.
const goby::acomms::protobuf::DynamicBufferConfig & cfg() const
Return the aggregate configuration.
bool in_blackout(typename Clock::time_point reference=Clock::now()) const
Returns if buffer is in blackout.
typename std::deque< T >::size_type size_type
bool empty() const
Returns if this queue is empty.
void update(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update the configurations without clearing the buffer.
bool all_waiting_for_ack(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
returns true if all messages have been sent within ack_timeout of the reference provided and thus non...
void pop()
Pop the value on the top of the queue.
std::vector< Value > push(const T &t, typename Clock::time_point reference=Clock::now())
Push a value to the queue.
Value & top(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the value at the top of the queue that hasn't been sent wihin ack_timeout.
size_type size() const
Retrieves the size of the queue.
size_t top_size(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Returns the size (in bytes) of the top of the queue that hasn't been sent within ack_timeout.
DynamicSubBuffer(const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a subbuffer with the given configuration.
std::pair< double, ValueResult > top_value(typename Clock::time_point reference=Clock::now(), size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Provides the numerical priority value based on this subbuffer's base priority, time-to-live (ttl) and...
bool has_value_base() const
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final
Quantity ttl_with_units() const
uint32_t max_queue() const
bool has_newest_first() const
Quantity blackout_time_with_units() const
bool has_blackout_time() const
bool has_ack_required() const
void set_value_base(double value)
void set_blackout_time(double value)
bool has_max_queue() const
void set_max_queue(uint32_t value)
bool newest_first() const
void set_ack_required(bool value)
double value_base() const
void set_newest_first(bool value)
bool ack_required() const
void set_ttl(double value)
double blackout_time() const
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::util::logger::GroupSetter group(std::string n)
size_t data_size(const Container &c)
constexpr int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
Clock::time_point push_time
subbuffer_id_type subbuffer_id
Clock::time_point push_time
bool operator==(const Value &a) const