Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
driver_thread.h
Go to the documentation of this file.
1// Copyright 2019-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
26
27#include <cstddef>
28#include <map>
29#include <memory>
30#include <ostream>
31#include <set>
32#include <string>
33#include <vector>
34
35#include <boost/units/quantity.hpp>
36
49#include "goby/time/convert.h"
52#include "goby/time/types.h"
54
55namespace goby
56{
57namespace acomms
58{
59namespace protobuf
60{
61class ModemTransmission;
62} // namespace protobuf
63} // namespace acomms
64
65namespace middleware
66{
67template <typename Data> class Publisher;
68
69namespace protobuf
70{
71inline size_t data_size(const SerializerTransporterMessage& msg) { return msg.data().size(); }
72
74{
75 return (a.key().serialize_time() == b.key().serialize_time() &&
77 a.key().type() == b.key().type() && a.key().group() == b.key().group() &&
78 a.data() == b.data());
79}
80
82{
83 if (a.key().serialize_time() != b.key().serialize_time())
84 return a.key().serialize_time() < b.key().serialize_time();
85 else if (a.key().marshalling_scheme() != b.key().marshalling_scheme())
86 return a.key().marshalling_scheme() < b.key().marshalling_scheme();
87 else if (a.key().type() != b.key().type())
88 return a.key().type() < b.key().type();
89 else if (a.key().group() != b.key().group())
90 return a.key().group() < b.key().group();
91 else
92 return a.data() < b.data();
93}
94
95} // namespace protobuf
96
97namespace intervehicle
98{
99namespace protobuf
100{
101inline bool operator==(const TransporterConfig& a, const TransporterConfig& b)
102{
103 return a.SerializeAsString() == b.SerializeAsString();
104}
105
106} // namespace protobuf
107
108template <typename Data>
109std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
110serialize_publication(const Data& d, const Group& group, const Publisher<Data>& publisher)
111{
113 auto* sbytes = new std::string(bytes.begin(), bytes.end());
114 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
115
116 auto* key = msg->mutable_key();
117 key->set_marshalling_scheme(MarshallingScheme::DCCL);
119 key->set_group(std::string(group));
120 key->set_group_numeric(group.numeric());
121 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
122 key->set_serialize_time_with_units(now);
123 *key->mutable_cfg() = publisher.cfg();
124 msg->set_allocated_data(sbytes);
125 return msg;
126}
127
131template <typename ImplementationTag>
133 : public goby::middleware::Thread<intervehicle::protobuf::PortalConfig::LinkConfig,
134 InterProcessForwarder<InterThreadTransporter,
135 ImplementationTag>>
136{
137 public:
141
143 void loop() override;
144 int tx_queue_size() { return buffer_.size(); }
145
146 private:
148 void _buffer_message(
149 const std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>& msg);
150 void _receive(const goby::acomms::protobuf::ModemTransmission& rx_msg);
151 void _forward_subscription(intervehicle::protobuf::Subscription subscription);
152 void _accept_subscription(const intervehicle::protobuf::Subscription& subscription);
153 void _expire_value(const goby::time::SteadyClock::time_point now,
156
157 subbuffer_id_type _create_buffer_id(unsigned dccl_id, unsigned group);
158
160 _create_buffer_id(const goby::middleware::protobuf::SerializerTransporterKey& key)
161 {
162 return _create_buffer_id(detail::DCCLSerializerParserHelperBase::id(key.type()),
163 key.group_numeric());
164 }
165
166 subbuffer_id_type _create_buffer_id(const intervehicle::protobuf::Subscription& subscription)
167 {
168 return _create_buffer_id(subscription.dccl_id(), subscription.group());
169 }
170
171 void _try_create_or_update_buffer(modem_id_type dest_id, const subbuffer_id_type& buffer_id);
172
173 modem_id_type _broadcast_id() { return this->cfg().modem_id() & this->cfg().subnet_mask(); }
174
175 // id within subnet
176 modem_id_type _id_within_subnet(modem_id_type id) { return id - _broadcast_id(); }
177
178 // full id
179 modem_id_type _full_id(modem_id_type id_in_subnet) { return id_in_subnet + _broadcast_id(); }
180
181 bool _dest_is_in_subnet(modem_id_type dest_id)
182 {
183 bool dest_in_subnet =
184 (dest_id & this->cfg().subnet_mask()) == (this->cfg().modem_id() & this->cfg().subnet_mask());
185 if (!dest_in_subnet)
187 << "Dest: " << dest_id
188 << " is not in subnet (our id: " << this->cfg().modem_id()
189 << ", mask: " << this->cfg().subnet_mask() << ")" << std::endl;
190
191 return dest_in_subnet;
192 }
193
194 void _publish_subscription_report(const intervehicle::protobuf::Subscription& changed);
195
196 private:
197 std::unique_ptr<InterThreadTransporter> interthread_;
198 std::unique_ptr<InterProcessForwarder<InterThreadTransporter, ImplementationTag>> interprocess_;
199
200 std::multimap<subbuffer_id_type, goby::middleware::protobuf::SerializerTransporterKey>
201 publisher_buffer_cfg_;
202
203 std::map<modem_id_type, std::multimap<subbuffer_id_type, intervehicle::protobuf::Subscription>>
204 subscriber_buffer_cfg_;
205
206 std::map<subbuffer_id_type, std::set<modem_id_type>> subbuffers_created_;
207
209 std::set<modem_id_type> subscription_subbuffers_;
210
212
213 using frame_type = int;
214 std::map<frame_type, std::vector<goby::acomms::DynamicBuffer<buffer_data_type>::Value>>
215 pending_ack_;
216
217 std::unique_ptr<goby::acomms::ModemDriverBase> driver_;
219
220 std::string glog_group_;
221
222 goby::time::SteadyClock::time_point next_modem_report_time_;
223 const goby::time::SteadyClock::duration modem_report_interval_;
224};
225
226} // namespace intervehicle
227} // namespace middleware
228} // namespace goby
229
230#endif
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Definition mac_manager.h:50
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition thread.h:61
Provides the modem driver thread used by InterVehiclePortal, templated on ImplementationTag so it use...
ModemDriverThread(const intervehicle::protobuf::PortalConfig::LinkConfig &cfg)
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::subbuffer_id_type subbuffer_id_type
const ::goby::middleware::protobuf::SerializerTransporterKey & key() const
goby::util::logger::GroupSetter group(std::string n)
bool operator==(const TransporterConfig &a, const TransporterConfig &b)
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
size_t data_size(const SerializerTransporterMessage &msg)
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static unsigned id(CharIterator begin, CharIterator end)
std::chrono::time_point< SteadyClock > time_point
std::chrono::microseconds duration
Duration type.