24 #ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25 #define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
28 #include <type_traits>
29 #include <unordered_map>
59 template <
typename Container>
size_t data_size(
const Container& c) {
return c.size(); }
65 using size_type =
typename std::deque<T>::size_type;
96 void update(
const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
100 throw(
goby::Exception(
"Configuration vector must not be empty for DynamicSubBuffer"));
106 std::result_of<decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::
type;
107 using value_base_type =
108 std::result_of<decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::
type;
110 ttl_type ttl_sum = 0;
111 ttl_type ttl_divisor = 0;
112 value_base_type value_base_sum = 0;
113 value_base_type value_base_divisor = 0;
115 for (
const auto&
cfg : cfgs)
137 ++value_base_divisor;
142 cfg_.
set_ttl(ttl_sum / ttl_divisor);
143 if (value_base_divisor > 0)
156 Value&
top(
typename Clock::time_point reference = Clock::now(),
157 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
159 for (
auto& datum_pair : data_)
161 auto& datum_last_access = datum_pair.first;
162 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
164 last_access_ = reference;
165 datum_last_access = last_access_;
166 return datum_pair.second;
173 size_t top_size(
typename Clock::time_point reference = Clock::now(),
174 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
176 for (
const auto& datum_pair : data_)
178 const auto& datum_last_access = datum_pair.first;
179 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
180 return data_size(datum_pair.second.data);
188 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
190 for (
const auto& datum_pair : data_)
192 const auto& datum_last_access = datum_pair.first;
193 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
214 std::pair<double, ValueResult>
215 top_value(
typename Clock::time_point reference = Clock::now(),
216 size_type max_bytes = std::numeric_limits<size_type>::max(),
217 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
222 return std::make_pair(-std::numeric_limits<double>::infinity(),
224 else if (
top_size(reference, ack_timeout) > max_bytes)
225 return std::make_pair(-std::numeric_limits<double>::infinity(),
228 return std::make_pair(-std::numeric_limits<double>::infinity(),
231 using Duration = std::chrono::microseconds;
233 double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
234 double ttl = goby::time::convert_duration<Duration>(cfg_.
ttl_with_units()).count();
237 double v = v_b * dt / ttl;
244 bool in_blackout(
typename Clock::time_point reference = Clock::now())
const
249 return reference <= (last_access_ + blackout);
252 bool empty()
const {
return data_.empty(); }
258 void pop() { data_.pop_front(); }
265 std::vector<Value>
push(
const T& t,
typename Clock::time_point reference = Clock::now())
267 std::vector<Value> exceeded;
270 data_.push_front(std::make_pair(zero_point_, Value({reference, t})));
272 data_.push_back(std::make_pair(zero_point_, Value({reference, t})));
276 exceeded.push_back(data_.back().second);
285 std::vector<Value>
expire(
typename Clock::time_point reference = Clock::now())
287 std::vector<Value> expired;
289 auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.
ttl_with_units());
292 while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
294 expired.push_back(data_.back().second);
300 while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
302 expired.push_back(data_.front().second);
317 for (
auto it = data_.begin(), end = data_.end(); it != end; ++it)
319 const auto& datum_pair = it->second;
320 if (datum_pair == value)
327 if (cfg_.
newest_first() && datum_pair.push_time < value.push_time)
329 else if (!cfg_.
newest_first() && datum_pair.push_time > value.push_time)
339 std::deque<std::pair<typename Clock::time_point, Value>> data_;
340 typename Clock::time_point last_access_{Clock::now()};
342 typename Clock::time_point zero_point_{std::chrono::seconds(0)};
346 template <
typename T,
typename Clock = goby::time::SteadyClock>
class DynamicBuffer
352 glog_priority_group_ =
"goby::acomms::buffer::priority::" +
std::to_string(
id);
378 create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
388 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
390 if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
391 throw(
goby::Exception(
"Subbuffer ID: " + sub_id +
" already exists."));
404 replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
413 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
416 create(dest_id, sub_id, cfgs);
427 update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
436 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
438 auto it = sub_[dest_id].find(sub_id);
439 if (it != sub_[dest_id].end())
440 it->second.update(cfgs);
442 create(dest_id, sub_id, cfgs);
451 sub_[dest_id].erase(sub_id);
459 std::vector<Value>
push(
const Value& fvt)
461 std::vector<Value> exceeded;
462 auto sub_exceeded =
sub(fvt.modem_id, fvt.subbuffer_id).push(fvt.data, fvt.push_time);
463 for (
const auto&
e : sub_exceeded)
464 exceeded.push_back({fvt.modem_id, fvt.subbuffer_id,
e.push_time,
e.data});
471 for (
const auto& sub_id_p : sub_)
473 for (
const auto& sub_p : sub_id_p.second)
475 if (!sub_p.second.empty())
487 for (
const auto& sub_id_p : sub_)
489 for (
const auto& sub_p : sub_id_p.second)
size += sub_p.second.size();
501 size_type max_bytes = std::numeric_limits<size_type>::max(),
502 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
507 glog <<
group(glog_priority_group_) <<
"Starting priority contest (dest: "
510 <<
", max_bytes: " << max_bytes <<
"):" << std::endl;
512 typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
514 double winning_value = -std::numeric_limits<double>::infinity();
516 auto now = Clock::now();
523 : sub_.find(dest_id),
526 : ++sub_.find(dest_id);
527 sub_id_it != sub_id_end; ++sub_id_it)
529 for (
auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
530 sub_it != sub_end; ++sub_it)
534 std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
536 std::string value_or_reason;
544 value_or_reason =
"empty";
548 value_or_reason =
"blackout";
552 value_or_reason =
"too large";
556 value_or_reason =
"ack wait";
561 <<
" [dest: " << sub_id_it->first
562 <<
", n: " << sub_it->second.size()
563 <<
"]: " << value_or_reason << std::endl;
565 if (value > winning_value)
567 winning_value = value;
568 winning_sub = sub_it;
569 dest_id = sub_id_it->first;
574 if (winning_value == -std::numeric_limits<double>::infinity())
577 const auto& top_p = winning_sub->second.top(now, ack_timeout);
579 <<
" (" <<
data_size(top_p.data) <<
"B)" << std::endl;
581 return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
591 return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
599 auto now = Clock::now();
600 std::vector<Value> expired;
601 for (
auto& sub_id_p : sub_)
603 for (
auto& sub_p : sub_id_p.second)
605 auto sub_expired = sub_p.second.expire(now);
606 for (
const auto&
e : sub_expired)
607 expired.push_back({sub_id_p.first, sub_p.first,
e.push_time,
e.data});
618 if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
620 " does not exist, must call create(...) first."));
621 return sub_.at(dest_id).at(sub_id);
626 std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
628 std::string glog_priority_group_;
629 static std::atomic<int> count_;
633 template <
typename T,
typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);