Goby3  3.1.5a
2024.05.23
interprocess.h
Go to the documentation of this file.
1 // Copyright 2016-2024:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 // Ryan Govostes <rgovostes+git@gmail.com>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26 #define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
27 
28 #include <atomic>
29 #include <functional>
30 #include <sys/types.h>
31 #include <thread>
32 #include <tuple>
33 #include <unistd.h>
34 
35 #include "goby/middleware/group.h"
36 
41 
42 namespace goby
43 {
44 namespace middleware
45 {
50 template <typename Derived, typename InnerTransporter>
52  : public StaticTransporterInterface<InterProcessTransporterBase<Derived, InnerTransporter>,
53  InnerTransporter>,
54  public Poller<InterProcessTransporterBase<Derived, InnerTransporter>>
55 {
56  using InterfaceType =
58  InnerTransporter>;
59 
61 
62  public:
64  : InterfaceType(inner), PollerType(&this->inner())
65  {
66  }
68 
70 
75  template <typename Data> static constexpr int scheme()
76  {
77  int scheme = goby::middleware::scheme<Data>();
78  // if default returns DCCL, use PROTOBUF instead
81  return scheme;
82  }
83 
91  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
92  void publish_dynamic(const Data& data, const Group& group,
93  const Publisher<Data>& publisher = Publisher<Data>())
94  {
96  static_cast<Derived*>(this)->template _publish<Data, scheme>(data, group, publisher);
97  this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
98  }
99 
107  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
108  void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
109  const Publisher<Data>& publisher = Publisher<Data>())
110  {
111  if (data)
112  {
114  static_cast<Derived*>(this)->template _publish<Data, scheme>(*data, group, publisher);
115  this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
116  }
117  }
118 
126  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
127  void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
128  const Publisher<Data>& publisher = Publisher<Data>())
129  {
130  publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
131  }
132 
134  void publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
136  {
138  static_cast<Derived*>(this)->_publish_serialized(type_name, scheme, bytes, group);
139  }
140 
148  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
149  void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
150  const Subscriber<Data>& subscriber = Subscriber<Data>())
151  {
153  static_cast<Derived*>(this)->template _subscribe<Data, scheme>(
154  [=](std::shared_ptr<const Data> d) { f(*d); }, group, subscriber);
155  }
156 
164  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
165  void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
166  const Subscriber<Data>& subscriber = Subscriber<Data>())
167  {
169  static_cast<Derived*>(this)->template _subscribe<Data, scheme>(f, group, subscriber);
170  }
171 
177  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
179  const Subscriber<Data>& subscriber = Subscriber<Data>())
180  {
182  static_cast<Derived*>(this)->template _unsubscribe<Data, scheme>(group, subscriber);
183  }
184 
186  void unsubscribe_all() { static_cast<Derived*>(this)->_unsubscribe_all(); }
187 
195  std::shared_ptr<SerializationSubscriptionRegex>
196  subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
197  const std::string& type, const Group& group)>
198  f,
199  const std::set<int>& schemes, const std::string& type_regex = ".*",
200  const std::string& group_regex = ".*")
201  {
202  return static_cast<Derived*>(this)->_subscribe_regex(f, schemes, type_regex, group_regex);
203  }
204 
214  template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
215  std::shared_ptr<SerializationSubscriptionRegex> subscribe_type_regex(
216  std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
217  const Group& group, const std::string& type_regex = ".*")
218  {
219  std::regex special_chars{R"([-[\]{}()*+?.,\^$|#\s])"};
220  std::string sanitized_group =
221  std::regex_replace(std::string(group), special_chars, R"(\$&)");
222 
223  auto regex_lambda = [=](const std::vector<unsigned char>& data, int schm,
224  const std::string& type, const Group& grp) {
225  auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
226  auto msg =
227  SerializerParserHelper<Data, scheme>::parse(data_begin, data_end, actual_end, type);
228  f(msg, type);
229  };
230 
231  return static_cast<Derived*>(this)->_subscribe_regex(regex_lambda, {scheme}, type_regex,
232  "^" + sanitized_group + "$");
233  }
234 
243  template <const Group& group, typename Data,
244  int scheme = InterProcessTransporterBase::scheme<Data>()>
246  std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
247  const std::string& type_regex = ".*")
248  {
249  subscribe_type_regex(f, group, type_regex);
250  }
251 
255  template <const Group& group> void check_validity()
256  {
257  static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
258  "goby::middleware::Group must have non-zero length string to publish on the "
259  "InterProcess layer");
260  }
261 
264  {
265  if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
266  throw(goby::Exception("Group must have a non-empty string for use on InterProcess"));
267  }
268 
269  protected:
270  static constexpr Group to_portal_group_{"goby::middleware::interprocess::to_portal"};
271  static constexpr Group regex_group_{"goby::middleware::interprocess::regex"};
272  static constexpr Group from_portal_group_{"goby::middleware::interprocess::from_portal"};
273 
274  private:
275  friend PollerType;
276  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
277  {
278  return static_cast<Derived*>(this)->_poll(lock);
279  }
280 };
281 
282 template <typename Derived, typename InnerTransporter>
283 constexpr goby::middleware::Group
285 template <typename Derived, typename InnerTransporter>
286 constexpr goby::middleware::Group
288 template <typename Derived, typename InnerTransporter>
289 constexpr goby::middleware::Group
291 
296 template <typename InnerTransporter>
297 class InterProcessForwarder
298  : public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter>, InnerTransporter>
299 {
300  public:
301  using Base =
303 
307  InterProcessForwarder(InnerTransporter& inner) : Base(inner)
308  {
309  this->inner()
310  .template subscribe<Base::regex_group_,
312  [this](
313  std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
314  msg) { _receive_regex_data_forwarded(msg); });
315  }
317  {
318  this->unsubscribe_all();
319 
320  // TODO - remove by adding in an explicit handshake with the unsubscribe_all publication so that we don't delete ourself (and thus our inner()) before the Portal has deleted all the subscriptions
321  usleep(1e5);
322  }
323 
324  friend Base;
325 
326  private:
327  template <typename Data, int scheme>
328  void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
329  {
330  // create and forward publication to edge
331  std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
332  std::string* sbytes = new std::string(bytes.begin(), bytes.end());
333  auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
334  auto* key = msg->mutable_key();
335 
336  key->set_marshalling_scheme(scheme);
338  key->set_group(std::string(group));
339  msg->set_allocated_data(sbytes);
340 
341  *key->mutable_cfg() = publisher.cfg();
342 
343  this->inner().template publish<Base::to_portal_group_>(msg);
344  }
345 
346  void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
348  {
349  auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
350  auto* key = msg->mutable_key();
351 
352  key->set_marshalling_scheme(scheme);
353  key->set_type(type_name);
354  key->set_group(std::string(group));
355  msg->set_data(std::string(bytes.begin(), bytes.end()));
356 
357  this->inner().template publish<Base::to_portal_group_>(msg);
358  }
359 
360  template <typename Data, int scheme>
361  void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
362  const Subscriber<Data>& subscriber)
363  {
364  this->inner().template subscribe_dynamic<Data, scheme>(f, group);
365 
366  // forward subscription to edge
367  auto inner_publication_lambda = [=](std::shared_ptr<const Data> d) {
368  this->inner().template publish_dynamic<Data, scheme>(d, group);
369  };
370 
371  auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
372  inner_publication_lambda, group,
373  middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
374  [=](const Data& d) { return group; }));
375 
376  this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
377  subscription);
378  }
379 
380  template <typename Data, int scheme>
381  void _unsubscribe(const Group& group, const Subscriber<Data>& subscriber)
382  {
383  this->inner().template unsubscribe_dynamic<Data, scheme>(group, subscriber);
384 
385  auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
386  new SerializationUnSubscription<Data, scheme>(group));
387 
388  this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
389  unsubscription);
390  }
391 
392  void _unsubscribe_all()
393  {
394  regex_subscriptions_.clear();
395  auto all = std::make_shared<SerializationUnSubscribeAll>();
396  this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
397  }
398 
399  std::shared_ptr<SerializationSubscriptionRegex>
400  _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
401  const std::string& type, const Group& group)>
402  f,
403  const std::set<int>& schemes, const std::string& type_regex = ".*",
404  const std::string& group_regex = ".*")
405  {
406  auto inner_publication_lambda = [=](const std::vector<unsigned char>& data, int scheme,
407  const std::string& type, const Group& group) {
408  std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
410  forwarded_data->mutable_key()->set_marshalling_scheme(scheme);
411  forwarded_data->mutable_key()->set_type(type);
412  forwarded_data->mutable_key()->set_group(group);
413  forwarded_data->set_data(std::string(data.begin(), data.end()));
414  this->inner().template publish<Base::regex_group_>(forwarded_data);
415  };
416 
417  auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
418  inner_publication_lambda, schemes, type_regex, group_regex);
419  this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
420  portal_subscription);
421 
422  auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
423  new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
424  regex_subscriptions_.insert(local_subscription);
425  return local_subscription;
426  }
427 
428  void _receive_regex_data_forwarded(
429  std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage> msg)
430  {
431  const auto& bytes = msg->data();
432  for (auto& sub : regex_subscriptions_)
433  sub->post(bytes.begin(), bytes.end(), msg->key().marshalling_scheme(),
434  msg->key().type(), msg->key().group());
435  }
436 
437  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
438  {
439  return 0;
440  } // A forwarder is a shell, only the inner Transporter has data
441 
442  private:
443  std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
444 };
445 
446 template <typename Derived, typename InnerTransporter>
447 class InterProcessPortalBase : public InterProcessTransporterBase<Derived, InnerTransporter>
448 {
449  public:
451 
452  InterProcessPortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
453  InterProcessPortalBase() { _init(); }
454 
456 
457  private:
458  void _init()
459  {
461  this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
462  [this](std::shared_ptr<const SerializerTransporterMessage> d) {
463  static_cast<Derived*>(this)->_receive_publication_forwarded(*d);
464  });
465 
466  this->inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
467  [this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s) {
468  static_cast<Derived*>(this)->_receive_subscription_forwarded(s);
469  });
470 
471  this->inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
472  [this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s) {
473  static_cast<Derived*>(this)->_receive_regex_subscription_forwarded(s);
474  });
475 
476  this->inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
477  [this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s) {
478  static_cast<Derived*>(this)->_unsubscribe_all(s->subscriber_id());
479  });
480  }
481 };
482 
483 } // namespace middleware
484 } // namespace goby
485 
486 #endif
goby::middleware::InterProcessTransporterBase::InterProcessTransporterBase
InterProcessTransporterBase(InnerTransporter &inner)
Definition: interprocess.h:63
goby::middleware::SerializerParserHelper::parse
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition: interface.h:129
goby::middleware::StaticTransporterInterface
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition: interface.h:203
goby::middleware::protobuf::SerializerTransporterMessage
Definition: serializer_transporter.pb.h:462
interface.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::SerializerParserHelper
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:97
goby::middleware::InterProcessForwarder::~InterProcessForwarder
virtual ~InterProcessForwarder()
Definition: interprocess.h:316
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
group.h
goby::middleware::InterProcessTransporterBase::publish_serialized
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
Definition: interprocess.h:134
goby::middleware::Poller
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition: poller.h:37
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::middleware::MarshallingScheme::PROTOBUF
@ PROTOBUF
Definition: interface.h:53
goby::middleware::InterProcessTransporterBase::regex_group_
static constexpr Group regex_group_
Definition: interprocess.h:271
goby::middleware::InterProcessPortalBase::~InterProcessPortalBase
virtual ~InterProcessPortalBase()
Definition: interprocess.h:455
goby::middleware::InterProcessTransporterBase::subscribe_type_regex
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
Definition: interprocess.h:215
goby::middleware::protobuf::TransporterConfig
Definition: transporter_config.pb.h:74
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::InterProcessTransporterBase::check_validity
void check_validity()
Check validity of the Group for interthread use (at compile time)
Definition: interprocess.h:255
goby::middleware::InterProcessTransporterBase::publish_dynamic
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
Definition: interprocess.h:92
goby::middleware::InterProcessTransporterBase::scheme
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
Definition: interprocess.h:75
null.h
goby::middleware::InterProcessForwarder::Base
friend Base
Definition: interprocess.h:324
goby::middleware::InterProcessTransporterBase
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
Definition: interprocess.h:51
goby::middleware::InterProcessPortalBase::InterProcessPortalBase
InterProcessPortalBase()
Definition: interprocess.h:453
goby::middleware::InterProcessTransporterBase::publish_dynamic
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
Definition: interprocess.h:127
goby::middleware::InterProcessTransporterBase::to_portal_group_
static constexpr Group to_portal_group_
Definition: interprocess.h:270
goby::middleware::InterProcessTransporterBase::subscribe_dynamic
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
Definition: interprocess.h:165
goby::middleware::Publisher::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
goby::middleware::InterProcessForwarder::Base
InterProcessTransporterBase< InterProcessForwarder< InnerTransporter >, InnerTransporter > Base
Definition: interprocess.h:302
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::middleware::InnerTransporterInterface< InterProcessTransporterBase< Derived, InnerTransporter >, InnerTransporter >::inner
InnerTransporter & inner()
Definition: interface.h:63
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
goby::middleware::InterProcessTransporterBase::unsubscribe_all
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Definition: interprocess.h:186
goby::middleware::InterProcessTransporterBase::subscribe_dynamic
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
Definition: interprocess.h:149
goby::middleware::InterProcessTransporterBase::InterProcessTransporterBase
InterProcessTransporterBase()
Definition: interprocess.h:67
goby::middleware::InterProcessTransporterBase::unsubscribe_dynamic
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
Definition: interprocess.h:178
goby::middleware::InterProcessPortalBase
Definition: interprocess.h:447
goby::middleware::MarshallingScheme::DCCL
@ DCCL
Definition: interface.h:54
goby::middleware::InterProcessTransporterBase::publish_dynamic
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
Definition: interprocess.h:108
goby::middleware::InterProcessForwarder::InterProcessForwarder
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
Definition: interprocess.h:307
goby::middleware::scheme
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
goby::middleware::InterProcessTransporterBase::check_validity_runtime
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
Definition: interprocess.h:263
serialization_handlers.h
goby::middleware::InterProcessTransporterBase::subscribe_type_regex
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
Definition: interprocess.h:245
poller.h
goby::middleware::InterProcessPortalBase::InterProcessPortalBase
InterProcessPortalBase(InnerTransporter &inner)
Definition: interprocess.h:452
goby::middleware::InterProcessTransporterBase::from_portal_group_
static constexpr Group from_portal_group_
Definition: interprocess.h:272
goby::middleware::InterProcessTransporterBase::subscribe_regex
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
Definition: interprocess.h:196
goby::middleware::InterProcessTransporterBase::~InterProcessTransporterBase
virtual ~InterProcessTransporterBase()
Definition: interprocess.h:69