Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
driver_thread.h
Go to the documentation of this file.
1// Copyright 2019-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
26
27#include <algorithm>
28#include <chrono>
29#include <cstddef>
30#include <map>
31#include <memory>
32#include <ostream>
33#include <set>
34#include <string>
35#include <vector>
36
37#include <boost/units/quantity.hpp>
38
51#include "goby/time/convert.h"
54#include "goby/time/types.h"
56
57namespace goby
58{
59namespace acomms
60{
61namespace protobuf
62{
63class ModemTransmission;
64} // namespace protobuf
65} // namespace acomms
66
67namespace middleware
68{
69template <typename Data> class Publisher;
70
71namespace protobuf
72{
73inline size_t data_size(const SerializerTransporterMessage& msg) { return msg.data().size(); }
74
76{
77 return (a.key().serialize_time() == b.key().serialize_time() &&
79 a.key().type() == b.key().type() && a.key().group() == b.key().group() &&
80 a.data() == b.data());
81}
82
84{
85 if (a.key().serialize_time() != b.key().serialize_time())
86 return a.key().serialize_time() < b.key().serialize_time();
87 else if (a.key().marshalling_scheme() != b.key().marshalling_scheme())
88 return a.key().marshalling_scheme() < b.key().marshalling_scheme();
89 else if (a.key().type() != b.key().type())
90 return a.key().type() < b.key().type();
91 else if (a.key().group() != b.key().group())
92 return a.key().group() < b.key().group();
93 else
94 return a.data() < b.data();
95}
96
97} // namespace protobuf
98
99namespace intervehicle
100{
101namespace protobuf
102{
103inline bool operator==(const TransporterConfig& a, const TransporterConfig& b)
104{
105 return a.SerializeAsString() == b.SerializeAsString();
106}
107
108} // namespace protobuf
109
110template <typename Data>
111std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
112serialize_publication(const Data& d, const Group& group, const Publisher<Data>& publisher)
113{
115 auto* sbytes = new std::string(bytes.begin(), bytes.end());
116 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
117
118 auto* key = msg->mutable_key();
119 key->set_marshalling_scheme(MarshallingScheme::DCCL);
121 key->set_group(std::string(group));
122 key->set_group_numeric(group.numeric());
123 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
124 key->set_serialize_time_with_units(now);
125 *key->mutable_cfg() = publisher.cfg();
126 msg->set_allocated_data(sbytes);
127 return msg;
128}
129
131 : public goby::middleware::Thread<intervehicle::protobuf::PortalConfig::LinkConfig,
132 InterProcessForwarder<InterThreadTransporter>>
133{
134 public:
138
140 void loop() override;
141 int tx_queue_size() { return buffer_.size(); }
142
143 private:
145 void _buffer_message(
146 const std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>& msg);
147 void _receive(const goby::acomms::protobuf::ModemTransmission& rx_msg);
148 void _forward_subscription(intervehicle::protobuf::Subscription subscription);
149 void _accept_subscription(const intervehicle::protobuf::Subscription& subscription);
150 void _expire_value(const goby::time::SteadyClock::time_point now,
153
154 subbuffer_id_type _create_buffer_id(unsigned dccl_id, unsigned group);
155
157 _create_buffer_id(const goby::middleware::protobuf::SerializerTransporterKey& key)
158 {
159 return _create_buffer_id(detail::DCCLSerializerParserHelperBase::id(key.type()),
160 key.group_numeric());
161 }
162
163 subbuffer_id_type _create_buffer_id(const intervehicle::protobuf::Subscription& subscription)
164 {
165 return _create_buffer_id(subscription.dccl_id(), subscription.group());
166 }
167
168 void _try_create_or_update_buffer(modem_id_type dest_id, const subbuffer_id_type& buffer_id);
169
170 modem_id_type _broadcast_id() { return cfg().modem_id() & cfg().subnet_mask(); }
171
172 // id within subnet
173 modem_id_type _id_within_subnet(modem_id_type id) { return id - _broadcast_id(); }
174
175 // full id
176 modem_id_type _full_id(modem_id_type id_in_subnet) { return id_in_subnet + _broadcast_id(); }
177
178 bool _dest_is_in_subnet(modem_id_type dest_id)
179 {
180 bool dest_in_subnet =
181 (dest_id & cfg().subnet_mask()) == (cfg().modem_id() & cfg().subnet_mask());
182 if (!dest_in_subnet)
184 << "Dest: " << dest_id
185 << " is not in subnet (our id: " << cfg().modem_id()
186 << ", mask: " << cfg().subnet_mask() << ")" << std::endl;
187
188 return dest_in_subnet;
189 }
190
191 void _publish_subscription_report(const intervehicle::protobuf::Subscription& changed);
192
193 private:
194 std::unique_ptr<InterThreadTransporter> interthread_;
195 std::unique_ptr<InterProcessForwarder<InterThreadTransporter>> interprocess_;
196
197 std::multimap<subbuffer_id_type, goby::middleware::protobuf::SerializerTransporterKey>
198 publisher_buffer_cfg_;
199
200 std::map<modem_id_type, std::multimap<subbuffer_id_type, intervehicle::protobuf::Subscription>>
201 subscriber_buffer_cfg_;
202
203 std::map<subbuffer_id_type, std::set<modem_id_type>> subbuffers_created_;
204
206 std::set<modem_id_type> subscription_subbuffers_;
207
209
210 using frame_type = int;
211 std::map<frame_type, std::vector<goby::acomms::DynamicBuffer<buffer_data_type>::Value>>
212 pending_ack_;
213
214 std::unique_ptr<goby::acomms::ModemDriverBase> driver_;
216
217 std::string glog_group_;
218
219 static std::map<std::string, void*> driver_plugins_;
220
221 goby::time::SteadyClock::time_point next_modem_report_time_;
222 const goby::time::SteadyClock::duration modem_report_interval_;
223};
224
225} // namespace intervehicle
226} // namespace middleware
227} // namespace goby
228
229#endif
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Definition mac_manager.h:51
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition thread.h:61
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::subbuffer_id_type subbuffer_id_type
ModemDriverThread(const intervehicle::protobuf::PortalConfig::LinkConfig &cfg)
const ::goby::middleware::protobuf::SerializerTransporterKey & key() const
goby::util::logger::GroupSetter group(std::string n)
bool operator==(const TransporterConfig &a, const TransporterConfig &b)
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
size_t data_size(const SerializerTransporterMessage &msg)
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static unsigned id(CharIterator begin, CharIterator end)
std::chrono::time_point< SteadyClock > time_point
std::chrono::microseconds duration
Duration type.