Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2024:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Ryan Govostes <rgovostes+git@gmail.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
27
28#include <atomic>
29#include <functional>
30#include <sys/types.h>
31#include <thread>
32#include <tuple>
33#include <unistd.h>
34
36
41
42namespace goby
43{
44namespace middleware
45{
50template <typename Derived, typename InnerTransporter>
52 : public StaticTransporterInterface<InterProcessTransporterBase<Derived, InnerTransporter>,
53 InnerTransporter>,
54 public Poller<InterProcessTransporterBase<Derived, InnerTransporter>>
55{
56 using InterfaceType =
58 InnerTransporter>;
59
61
62 public:
65 {
66 }
68
70
75 template <typename Data> static constexpr int scheme()
76 {
77 int scheme = goby::middleware::scheme<Data>();
78 // if default returns DCCL, use PROTOBUF instead
81 return scheme;
82 }
83
91 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
92 void publish_dynamic(const Data& data, const Group& group,
93 const Publisher<Data>& publisher = Publisher<Data>())
94 {
96 static_cast<Derived*>(this)->template _publish<Data, scheme>(data, group, publisher);
97 this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
98 }
99
107 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
108 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
109 const Publisher<Data>& publisher = Publisher<Data>())
110 {
111 if (data)
112 {
114 static_cast<Derived*>(this)->template _publish<Data, scheme>(*data, group, publisher);
115 this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
116 }
117 }
118
126 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
127 void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
128 const Publisher<Data>& publisher = Publisher<Data>())
129 {
130 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
131 }
132
134 void publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
136 {
138 static_cast<Derived*>(this)->_publish_serialized(type_name, scheme, bytes, group);
139 }
140
148 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
149 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
150 const Subscriber<Data>& subscriber = Subscriber<Data>())
151 {
153 static_cast<Derived*>(this)->template _subscribe<Data, scheme>(
154 [=](std::shared_ptr<const Data> d) { f(*d); }, group, subscriber);
155 }
156
164 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
165 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
166 const Subscriber<Data>& subscriber = Subscriber<Data>())
167 {
169 static_cast<Derived*>(this)->template _subscribe<Data, scheme>(f, group, subscriber);
170 }
171
177 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
179 const Subscriber<Data>& subscriber = Subscriber<Data>())
180 {
182 static_cast<Derived*>(this)->template _unsubscribe<Data, scheme>(group, subscriber);
183 }
184
186 void unsubscribe_all() { static_cast<Derived*>(this)->_unsubscribe_all(); }
187
195 std::shared_ptr<SerializationSubscriptionRegex>
196 subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
197 const std::string& type, const Group& group)>
198 f,
199 const std::set<int>& schemes, const std::string& type_regex = ".*",
200 const std::string& group_regex = ".*")
201 {
202 return static_cast<Derived*>(this)->_subscribe_regex(f, schemes, type_regex, group_regex);
203 }
204
214 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
215 std::shared_ptr<SerializationSubscriptionRegex> subscribe_type_regex(
216 std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
217 const Group& group, const std::string& type_regex = ".*")
218 {
219 std::regex special_chars{R"([-[\]{}()*+?.,\^$|#\s])"};
220 std::string sanitized_group =
221 std::regex_replace(std::string(group), special_chars, R"(\$&)");
222
223 auto regex_lambda = [=](const std::vector<unsigned char>& data, int schm,
224 const std::string& type, const Group& grp) {
225 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
226 auto msg =
227 SerializerParserHelper<Data, scheme>::parse(data_begin, data_end, actual_end, type);
228 f(msg, type);
229 };
230
231 return static_cast<Derived*>(this)->_subscribe_regex(regex_lambda, {scheme}, type_regex,
232 "^" + sanitized_group + "$");
233 }
234
243 template <const Group& group, typename Data,
244 int scheme = InterProcessTransporterBase::scheme<Data>()>
246 std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
247 const std::string& type_regex = ".*")
248 {
249 subscribe_type_regex(f, group, type_regex);
250 }
251
255 template <const Group& group> void check_validity()
256 {
257 static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
258 "goby::middleware::Group must have non-zero length string to publish on the "
259 "InterProcess layer");
260 }
261
264 {
265 if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
266 throw(goby::Exception("Group must have a non-empty string for use on InterProcess"));
267 }
268
269 protected:
270 static constexpr Group to_portal_group_{"goby::middleware::interprocess::to_portal"};
271 static constexpr Group regex_group_{"goby::middleware::interprocess::regex"};
272 static constexpr Group from_portal_group_{"goby::middleware::interprocess::from_portal"};
273
274 private:
275 friend PollerType;
276 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
277 {
278 return static_cast<Derived*>(this)->_poll(lock);
279 }
280};
281
282template <typename Derived, typename InnerTransporter>
285template <typename Derived, typename InnerTransporter>
288template <typename Derived, typename InnerTransporter>
291
296template <typename InnerTransporter>
298 : public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter>, InnerTransporter>
299{
300 public:
301 using Base =
303
307 InterProcessForwarder(InnerTransporter& inner) : Base(inner)
308 {
309 this->inner()
312 [this](
313 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
314 msg) { _receive_regex_data_forwarded(msg); });
315 }
317 {
318 this->unsubscribe_all();
319
320 // TODO - remove by adding in an explicit handshake with the unsubscribe_all publication so that we don't delete ourself (and thus our inner()) before the Portal has deleted all the subscriptions
321 usleep(1e5);
322 }
323
324 friend Base;
325
326 private:
327 template <typename Data, int scheme>
328 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
329 {
330 // create and forward publication to edge
331 std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
332 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
333 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
334 auto* key = msg->mutable_key();
335
336 key->set_marshalling_scheme(scheme);
338 key->set_group(std::string(group));
339 msg->set_allocated_data(sbytes);
340
341 *key->mutable_cfg() = publisher.cfg();
342
343 this->inner().template publish<Base::to_portal_group_>(msg);
344 }
345
346 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
348 {
349 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
350 auto* key = msg->mutable_key();
351
352 key->set_marshalling_scheme(scheme);
353 key->set_type(type_name);
354 key->set_group(std::string(group));
355 msg->set_data(std::string(bytes.begin(), bytes.end()));
356
357 this->inner().template publish<Base::to_portal_group_>(msg);
358 }
359
360 template <typename Data, int scheme>
361 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
362 const Subscriber<Data>& subscriber)
363 {
364 this->inner().template subscribe_dynamic<Data, scheme>(f, group);
365
366 // forward subscription to edge
367 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d) {
368 this->inner().template publish_dynamic<Data, scheme>(d, group);
369 };
370
371 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
372 inner_publication_lambda, group,
373 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
374 [=](const Data& d) { return group; }));
375
376 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
377 subscription);
378 }
379
380 template <typename Data, int scheme>
381 void _unsubscribe(const Group& group, const Subscriber<Data>& subscriber)
382 {
383 this->inner().template unsubscribe_dynamic<Data, scheme>(group, subscriber);
384
385 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
386 new SerializationUnSubscription<Data, scheme>(group));
387
388 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
389 unsubscription);
390 }
391
392 void _unsubscribe_all()
393 {
394 regex_subscriptions_.clear();
395 auto all = std::make_shared<SerializationUnSubscribeAll>();
396 this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
397 }
398
399 std::shared_ptr<SerializationSubscriptionRegex>
400 _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
401 const std::string& type, const Group& group)>
402 f,
403 const std::set<int>& schemes, const std::string& type_regex = ".*",
404 const std::string& group_regex = ".*")
405 {
406 auto inner_publication_lambda = [=](const std::vector<unsigned char>& data, int scheme,
407 const std::string& type, const Group& group) {
408 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
410 forwarded_data->mutable_key()->set_marshalling_scheme(scheme);
411 forwarded_data->mutable_key()->set_type(type);
412 forwarded_data->mutable_key()->set_group(group);
413 forwarded_data->set_data(std::string(data.begin(), data.end()));
414 this->inner().template publish<Base::regex_group_>(forwarded_data);
415 };
416
417 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
418 inner_publication_lambda, schemes, type_regex, group_regex);
419 this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
420 portal_subscription);
421
422 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
423 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
424 regex_subscriptions_.insert(local_subscription);
425 return local_subscription;
426 }
427
428 void _receive_regex_data_forwarded(
429 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage> msg)
430 {
431 const auto& bytes = msg->data();
432 for (auto& sub : regex_subscriptions_)
433 sub->post(bytes.begin(), bytes.end(), msg->key().marshalling_scheme(),
434 msg->key().type(), msg->key().group());
435 }
436
437 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
438 {
439 return 0;
440 } // A forwarder is a shell, only the inner Transporter has data
441
442 private:
443 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
444};
445
446template <typename Derived, typename InnerTransporter>
447class InterProcessPortalBase : public InterProcessTransporterBase<Derived, InnerTransporter>
448{
449 public:
451
452 InterProcessPortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
454
456
457 private:
458 void _init()
459 {
461 this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
462 [this](std::shared_ptr<const SerializerTransporterMessage> d) {
463 static_cast<Derived*>(this)->_receive_publication_forwarded(*d);
464 });
465
466 this->inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
467 [this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s) {
468 static_cast<Derived*>(this)->_receive_subscription_forwarded(s);
469 });
470
471 this->inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
472 [this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s) {
473 static_cast<Derived*>(this)->_receive_regex_subscription_forwarded(s);
474 });
475
476 this->inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
477 [this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s) {
478 static_cast<Derived*>(this)->_unsubscribe_all(s->subscriber_id());
479 });
480 }
481};
482
483} // namespace middleware
484} // namespace goby
485
486#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Implements the forwarder concept for the interprocess layer.
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
InterProcessPortalBase(InnerTransporter &inner)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
InterProcessTransporterBase(InnerTransporter &inner)
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
void unsubscribe_all()
Unsubscribe from all current subscriptions.
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:204
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:270
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
type
Generic JSON types used in JWTs.
Definition jwt.h:2072
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition interface.h:129