Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
serialization_handlers.h
Go to the documentation of this file.
1// Copyright 2016-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// James D. Turner <james.turner@nrl.navy.mil>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_SERIALIZATION_HANDLERS_H
26#define GOBY_MIDDLEWARE_TRANSPORT_SERIALIZATION_HANDLERS_H
27
28#include <chrono>
29#include <memory>
30#include <regex>
31#include <thread>
32#include <unordered_map>
33
34#include "goby/exception.h"
35#include "goby/util/binary.h"
36
41
42#include "interface.h"
43#include "null.h"
44
45namespace goby
46{
47namespace middleware
48{
50template <typename Metadata, typename Enable = void> class SerializationHandlerPostSelector
51{
52};
53
55template <typename Metadata>
57 typename std::enable_if_t<std::is_void<Metadata>::value>>
58{
59 public:
62
63 virtual std::string::const_iterator post(std::string::const_iterator b,
64 std::string::const_iterator e) const = 0;
65#ifndef _LIBCPP_VERSION
66 virtual std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
67 std::vector<char>::const_iterator e) const = 0;
68#endif
69 virtual const char* post(const char* b, const char* e) const = 0;
70};
71
73template <typename Metadata>
75 typename std::enable_if_t<!std::is_void<Metadata>::value>>
76{
77 public:
80
81 virtual std::string::const_iterator post(std::string::const_iterator b,
82 std::string::const_iterator e,
83 const Metadata& metadata) const = 0;
84#ifndef _LIBCPP_VERSION
85 virtual std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
86 std::vector<char>::const_iterator e,
87 const Metadata& metadata) const = 0;
88#endif
89 virtual const char* post(const char* b, const char* e, const Metadata& metadata) const = 0;
90};
91
95template <typename Metadata = void>
97{
98 public:
100 virtual ~SerializationHandlerBase() = default;
101
102 virtual const std::string& type_name() const = 0;
103 virtual const Group& subscribed_group() const = 0;
104
105 virtual int scheme() const = 0;
106
108 {
109 SUBSCRIBE,
112 };
113 virtual SubscriptionAction action() const = 0;
114
115 std::thread::id thread_id() const { return thread_id_; }
116 virtual std::string subscriber_id() const { return subscriber_id_; }
117
118 private:
119 const std::thread::id thread_id_{std::this_thread::get_id()};
120 const std::string subscriber_id_{goby::middleware::thread_id(thread_id_)};
121};
122
123template <typename Metadata>
126{
127 return s1.scheme() == s2.scheme() && s1.type_name() == s2.type_name() &&
128 s1.subscribed_group() == s2.subscribed_group() && s1.action() == s2.action();
129}
130
135template <typename Data, int scheme_id>
137{
138 public:
139 typedef std::function<void(std::shared_ptr<const Data> data)> HandlerType;
140
143 const Subscriber<Data>& subscriber = Subscriber<Data>())
144 : handler_(handler),
145 type_name_(SerializerParserHelper<Data, scheme_id>::type_name()),
146 group_(group),
147 subscriber_(subscriber)
148 {
149 }
150
151 // handle an incoming message
152 std::string::const_iterator post(std::string::const_iterator b,
153 std::string::const_iterator e) const override
154 {
155 return _post(b, e);
156 }
157
158#ifndef _LIBCPP_VERSION
159 std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
160 std::vector<char>::const_iterator e) const override
161 {
162 return _post(b, e);
163 }
164#endif
165
166 const char* post(const char* b, const char* e) const override { return _post(b, e); }
167
172
173 // getters
174 const std::string& type_name() const override { return type_name_; }
175 const Group& subscribed_group() const override { return group_; }
176 int scheme() const override { return scheme_id; }
177
178 private:
179 template <typename CharIterator>
180 CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end) const
181 {
182 CharIterator actual_end;
183 auto msg = SerializerParserHelper<Data, scheme_id>::parse(bytes_begin, bytes_end,
184 actual_end, type_name_);
185
186 if (subscribed_group() == subscriber_.group(*msg) && handler_)
187 handler_(msg);
188
189 return actual_end;
190 }
191
192 private:
193 HandlerType handler_;
194 const std::string type_name_;
195 const Group group_;
196 Subscriber<Data> subscriber_;
197};
198
203template <typename Data, int scheme_id>
205 : public SerializationHandlerBase<intervehicle::protobuf::Header>
206{
207 public:
208 typedef std::function<void(std::shared_ptr<const Data> data)> HandlerType;
209
212 const Subscriber<Data>& subscriber = Subscriber<Data>())
213 : handler_(handler),
214 type_name_(SerializerParserHelper<Data, scheme_id>::type_name()),
215 group_(group),
216 subscriber_(subscriber)
217 {
218 }
219
220 // handle an incoming message
221 std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e,
222 const intervehicle::protobuf::Header& header) const override
223 {
224 return _post(b, e, header);
225 }
226
227#ifndef _LIBCPP_VERSION
228 std::vector<char>::const_iterator
229 post(std::vector<char>::const_iterator b, std::vector<char>::const_iterator e,
230 const intervehicle::protobuf::Header& header) const override
231 {
232 return _post(b, e, header);
233 }
234#endif
235
236 const char* post(const char* b, const char* e,
237 const intervehicle::protobuf::Header& header) const override
238 {
239 return _post(b, e, header);
240 }
241
243 action() const override
244 {
246 intervehicle::protobuf::Header>::SubscriptionAction::SUBSCRIBE;
247 }
248
249 // getters
250 const std::string& type_name() const override { return type_name_; }
251 const Group& subscribed_group() const override { return group_; }
252 int scheme() const override { return scheme_id; }
253
254 private:
255 template <typename CharIterator>
256 CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end,
257 const intervehicle::protobuf::Header& header) const
258 {
259 CharIterator actual_end;
260 auto msg = SerializerParserHelper<Data, scheme_id>::parse(bytes_begin, bytes_end,
261 actual_end, type_name_);
262
263 subscriber_.set_link_data(*msg, header);
264
265 if (subscribed_group() == subscriber_.group(*msg) && handler_)
266 handler_(msg);
267
268 return actual_end;
269 }
270
271 private:
272 HandlerType handler_;
273 const std::string type_name_;
274 const Group group_;
275 Subscriber<Data> subscriber_;
276};
277
279template <typename Data, int scheme_id, typename Metadata>
281{
282 public:
283 typedef std::function<void(const Data& data, const Metadata& md)> HandlerType;
284
286 : handler_(handler), type_name_(SerializerParserHelper<Data, scheme_id>::type_name())
287 {
288 }
289
290 PublisherCallback(HandlerType handler, const Data& data)
291 : handler_(handler), type_name_(SerializerParserHelper<Data, scheme_id>::type_name(data))
292 {
293 }
294
295 // handle an incoming message
296 std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e,
297 const Metadata& md) const override
298 {
299 return _post(b, e, md);
300 }
301
302#ifndef _LIBCPP_VERSION
303 std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
304 std::vector<char>::const_iterator e,
305 const Metadata& md) const override
306 {
307 return _post(b, e, md);
308 }
309#endif
310
311 const char* post(const char* b, const char* e, const Metadata& md) const override
312 {
313 return _post(b, e, md);
314 }
315
320
321 // getters
322 const std::string& type_name() const override { return type_name_; }
323 const Group& subscribed_group() const override { return group_; }
324 int scheme() const override { return scheme_id; }
325
326 private:
327 template <typename CharIterator>
328 CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end, const Metadata& md) const
329 {
330 CharIterator actual_end;
331 auto msg = SerializerParserHelper<Data, scheme_id>::parse(bytes_begin, bytes_end,
332 actual_end, type_name_);
333
334 if (handler_)
335 handler_(*msg, md);
336 return actual_end;
337 }
338
339 private:
340 HandlerType handler_;
341 const std::string type_name_;
342 Group group_{Group(Group::broadcast_group)};
343};
344
349template <typename Data, int scheme_id>
351{
352 public:
354 : type_name_(SerializerParserHelper<Data, scheme_id>::type_name()), group_(group)
355 {
356 }
357
358 std::string::const_iterator post(std::string::const_iterator b,
359 std::string::const_iterator e) const override
360 {
361 throw(goby::Exception("Cannot call post on an UnSubscription"));
362 }
363
364#ifndef _LIBCPP_VERSION
365 std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
366 std::vector<char>::const_iterator e) const override
367 {
368 throw(goby::Exception("Cannot call post on an UnSubscription"));
369 }
370#endif
371
372 const char* post(const char* b, const char* e) const override
373 {
374 throw(goby::Exception("Cannot call post on an UnSubscription"));
375 }
376
381
382 // getters
383 const std::string& type_name() const override { return type_name_; }
384 const Group& subscribed_group() const override { return group_; }
385 int scheme() const override { return scheme_id; }
386
387 private:
388 const std::string type_name_;
389 const Group group_;
390};
391
394{
395 public:
396 typedef std::function<void(const std::vector<unsigned char>&, int scheme,
397 const std::string& type, const Group& group)>
399
400 SerializationSubscriptionRegex(HandlerType handler, const std::set<int>& schemes,
401 const std::string& type_regex = ".*",
402 const std::string& group_regex = ".*")
403 : handler_(handler), schemes_(schemes), type_regex_(type_regex), group_regex_(group_regex)
404 {
405 }
406
407 void update_type_regex(const std::string& type_regex) { type_regex_.assign(type_regex); }
408 void update_group_regex(const std::string& group_regex) { group_regex_.assign(group_regex); }
409
410 // handle an incoming message
411 // return true if posted
412 template <typename CharIterator>
413 bool post(CharIterator bytes_begin, CharIterator bytes_end, int scheme, const std::string& type,
414 const std::string& group) const
415 {
417 schemes_.count(scheme)) &&
418 std::regex_match(type, type_regex_) && std::regex_match(group, group_regex_))
419 {
420 std::vector<unsigned char> data(bytes_begin, bytes_end);
421 handler_(data, scheme, type, goby::middleware::DynamicGroup(group));
422 return true;
423 }
424 else
425 {
426 return false;
427 }
428 }
429
430 std::thread::id thread_id() const { return thread_id_; }
431 std::string subscriber_id() const { return subscriber_id_; }
432
433 private:
434 HandlerType handler_;
435 const std::set<int> schemes_;
436 std::regex type_regex_;
437 std::regex group_regex_;
438 const std::thread::id thread_id_{std::this_thread::get_id()};
439 const std::string subscriber_id_{goby::middleware::thread_id(thread_id_)};
440};
441
444{
445 public:
446 std::thread::id thread_id() const { return thread_id_; }
447 std::string subscriber_id() const { return subscriber_id_; }
448
449 private:
450 const std::thread::id thread_id_{std::this_thread::get_id()};
451 const std::string subscriber_id_{goby::middleware::thread_id(thread_id_)};
452};
453
456{
457 public:
458 typedef std::function<void(const protobuf::SerializerTransporterMessage& d)> HandlerType;
459
462 : handler_(handler), sub_cfg_(sub), group_(sub_cfg_.key().group()), subscriber_id_(sub.id())
463 {
464 }
465
466 // handle an incoming message
467 std::string::const_iterator post(std::string::const_iterator b,
468 std::string::const_iterator e) const override
469 {
470 return _post(b, e);
471 }
472
473#ifndef _LIBCPP_VERSION
474 std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
475 std::vector<char>::const_iterator e) const override
476 {
477 return _post(b, e);
478 }
479#endif
480
481 const char* post(const char* b, const char* e) const override { return _post(b, e); }
482
495
496 // getters
497 const std::string& type_name() const override { return sub_cfg_.key().type(); }
498 const Group& subscribed_group() const override { return group_; }
499 int scheme() const override { return sub_cfg_.key().marshalling_scheme(); }
500
501 std::string subscriber_id() const override { return subscriber_id_; }
502
503 private:
504 template <typename CharIterator>
505 CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end) const
506 {
508 std::string* sbytes = new std::string(bytes_begin, bytes_end);
509 *msg.mutable_key() = sub_cfg_.key();
510 msg.set_allocated_data(sbytes);
511 handler_(msg);
512
513 CharIterator actual_end = bytes_end;
514 return actual_end;
515 }
516
517 private:
518 HandlerType handler_;
519 intermodule::protobuf::Subscription sub_cfg_;
520 DynamicGroup group_;
521 const std::thread::id thread_id_;
522 const std::string subscriber_id_;
523};
524
525} // namespace middleware
526} // namespace goby
527
528#endif
simple exception class for goby applications
Definition exception.h:35
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition group.h:120
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
Definition group.h:63
Represents a subscription to a serialized data type (intervehicle layer).
SerializationHandlerBase< intervehicle::protobuf::Header >::SubscriptionAction action() const override
const char * post(const char *b, const char *e, const intervehicle::protobuf::Header &header) const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e, const intervehicle::protobuf::Header &header) const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e, const intervehicle::protobuf::Header &header) const override
std::function< void(std::shared_ptr< const Data > data)> HandlerType
IntervehicleSerializationSubscription(HandlerType handler, const Group &group=Group(Group::broadcast_group), const Subscriber< Data > &subscriber=Subscriber< Data >())
Represents a callback for a published data type (e.g. acked_func or expired_func)
std::function< void(const Data &data, const Metadata &md)> HandlerType
const char * post(const char *b, const char *e, const Metadata &md) const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e, const Metadata &md) const override
const std::string & type_name() const override
PublisherCallback(HandlerType handler, const Data &data)
SerializationHandlerBase< Metadata >::SubscriptionAction action() const override
const Group & subscribed_group() const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e, const Metadata &md) const override
Base class for handling posting callbacks for serialized data types (interprocess and outer)
virtual const std::string & type_name() const =0
virtual SubscriptionAction action() const =0
virtual const Group & subscribed_group() const =0
virtual std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const =0
virtual std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const =0
virtual std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e, const Metadata &metadata) const =0
virtual std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e, const Metadata &metadata) const =0
virtual const char * post(const char *b, const char *e, const Metadata &metadata) const =0
Selector class for enabling SerializationHandlerBase::post() override signature based on whether the ...
Represents a(n) (un)subscription from an InterModuleForwarder.
SerializationInterModuleSubscription(HandlerType handler, const intermodule::protobuf::Subscription sub)
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const override
const char * post(const char *b, const char *e) const override
SerializationHandlerBase ::SubscriptionAction action() const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const override
std::function< void(const protobuf::SerializerTransporterMessage &d)> HandlerType
Represents a regex subscription to a serialized data type (interprocess and outer layers).
SerializationSubscriptionRegex(HandlerType handler, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
void update_type_regex(const std::string &type_regex)
void update_group_regex(const std::string &group_regex)
std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> HandlerType
bool post(CharIterator bytes_begin, CharIterator bytes_end, int scheme, const std::string &type, const std::string &group) const
Represents a subscription to a serialized data type (interprocess layer).
SerializationSubscription(HandlerType handler, const Group &group=Group(Group::broadcast_group), const Subscriber< Data > &subscriber=Subscriber< Data >())
const std::string & type_name() const override
SerializationHandlerBase ::SubscriptionAction action() const override
std::function< void(std::shared_ptr< const Data > data)> HandlerType
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const override
const char * post(const char *b, const char *e) const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const override
Represents an unsubscription to all subscribed data for a given thread.
Represents an unsubscription to a serialized data type (interprocess and outer layers).
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const override
const std::string & type_name() const override
const char * post(const char *b, const char *e) const override
SerializationHandlerBase ::SubscriptionAction action() const override
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
::goby::middleware::intermodule::protobuf::Subscription_Action action() const
const ::goby::middleware::protobuf::SerializerTransporterKey & key() const
goby::util::logger::GroupSetter group(std::string n)
bool operator==(const Group &a, const Group &b)
Definition group.h:106
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:53
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
STL namespace.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition interface.h:129