Goby3  3.1.4
2024.02.22
interprocess.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 // Ryan Govostes <rgovostes+git@gmail.com>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26 #define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
27 
28 #include <atomic>
29 #include <functional>
30 #include <sys/types.h>
31 #include <thread>
32 #include <tuple>
33 #include <unistd.h>
34 
35 #include "goby/middleware/group.h"
36 
41 
42 namespace goby
43 {
44 namespace middleware
45 {
50 template <typename Derived, typename InnerTransporter>
52  : public StaticTransporterInterface<InterProcessTransporterBase<Derived, InnerTransporter>,
53  InnerTransporter>,
54  public Poller<InterProcessTransporterBase<Derived, InnerTransporter>>
55 {
56  using InterfaceType =
58  InnerTransporter>;
59 
61 
62  public:
64  : InterfaceType(inner), PollerType(&this->inner())
65  {
66  }
68 
70 
78  template <typename Data, int scheme = scheme<Data>()>
79  void publish_dynamic(const Data& data, const Group& group,
80  const Publisher<Data>& publisher = Publisher<Data>())
81  {
83  static_cast<Derived*>(this)->template _publish<Data, scheme>(data, group, publisher);
84  this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
85  }
86 
94  template <typename Data, int scheme = scheme<Data>()>
95  void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
96  const Publisher<Data>& publisher = Publisher<Data>())
97  {
98  if (data)
99  {
101  static_cast<Derived*>(this)->template _publish<Data, scheme>(*data, group, publisher);
102  this->inner().template publish_dynamic<Data, scheme>(data, group, publisher);
103  }
104  }
105 
113  template <typename Data, int scheme = scheme<Data>()>
114  void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
115  const Publisher<Data>& publisher = Publisher<Data>())
116  {
117  publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
118  }
119 
121  void publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
123  {
125  static_cast<Derived*>(this)->_publish_serialized(type_name, scheme, bytes, group);
126  }
127 
135  template <typename Data, int scheme = scheme<Data>()>
136  void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
137  const Subscriber<Data>& subscriber = Subscriber<Data>())
138  {
140  static_cast<Derived*>(this)->template _subscribe<Data, scheme>(
141  [=](std::shared_ptr<const Data> d) { f(*d); }, group, subscriber);
142  }
143 
151  template <typename Data, int scheme = scheme<Data>()>
152  void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
153  const Subscriber<Data>& subscriber = Subscriber<Data>())
154  {
156  static_cast<Derived*>(this)->template _subscribe<Data, scheme>(f, group, subscriber);
157  }
158 
164  template <typename Data, int scheme = scheme<Data>()>
166  const Subscriber<Data>& subscriber = Subscriber<Data>())
167  {
169  static_cast<Derived*>(this)->template _unsubscribe<Data, scheme>(group, subscriber);
170  }
171 
173  void unsubscribe_all() { static_cast<Derived*>(this)->_unsubscribe_all(); }
174 
182  std::shared_ptr<SerializationSubscriptionRegex>
183  subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
184  const std::string& type, const Group& group)>
185  f,
186  const std::set<int>& schemes, const std::string& type_regex = ".*",
187  const std::string& group_regex = ".*")
188  {
189  return static_cast<Derived*>(this)->_subscribe_regex(f, schemes, type_regex, group_regex);
190  }
191 
201  template <typename Data, int scheme = scheme<Data>()>
202  std::shared_ptr<SerializationSubscriptionRegex> subscribe_type_regex(
203  std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
204  const Group& group, const std::string& type_regex = ".*")
205  {
206  std::regex special_chars{R"([-[\]{}()*+?.,\^$|#\s])"};
207  std::string sanitized_group =
208  std::regex_replace(std::string(group), special_chars, R"(\$&)");
209 
210  auto regex_lambda = [=](const std::vector<unsigned char>& data, int schm,
211  const std::string& type, const Group& grp) {
212  auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
213  auto msg =
214  SerializerParserHelper<Data, scheme>::parse(data_begin, data_end, actual_end, type);
215  f(msg, type);
216  };
217 
218  return static_cast<Derived*>(this)->_subscribe_regex(regex_lambda, {scheme}, type_regex,
219  "^" + sanitized_group + "$");
220  }
221 
230  template <const Group& group, typename Data, int scheme = scheme<Data>()>
232  std::function<void(std::shared_ptr<const Data>, const std::string& type)> f,
233  const std::string& type_regex = ".*")
234  {
235  subscribe_type_regex(f, group, type_regex);
236  }
237 
242  template <typename Data> static constexpr int scheme()
243  {
244  int scheme = goby::middleware::scheme<Data>();
245  // if default returns DCCL, use PROTOBUF instead
248  return scheme;
249  }
250 
254  template <const Group& group> void check_validity()
255  {
256  static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
257  "goby::middleware::Group must have non-zero length string to publish on the "
258  "InterProcess layer");
259  }
260 
263  {
264  if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
265  throw(goby::Exception("Group must have a non-empty string for use on InterProcess"));
266  }
267 
268  protected:
269  static constexpr Group to_portal_group_{"goby::middleware::interprocess::to_portal"};
270  static constexpr Group regex_group_{"goby::middleware::interprocess::regex"};
271  static constexpr Group from_portal_group_{"goby::middleware::interprocess::from_portal"};
272 
273  private:
274  friend PollerType;
275  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
276  {
277  return static_cast<Derived*>(this)->_poll(lock);
278  }
279 };
280 
281 template <typename Derived, typename InnerTransporter>
282 constexpr goby::middleware::Group
284 template <typename Derived, typename InnerTransporter>
285 constexpr goby::middleware::Group
287 template <typename Derived, typename InnerTransporter>
288 constexpr goby::middleware::Group
290 
295 template <typename InnerTransporter>
296 class InterProcessForwarder
297  : public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter>, InnerTransporter>
298 {
299  public:
300  using Base =
302 
306  InterProcessForwarder(InnerTransporter& inner) : Base(inner)
307  {
308  this->inner()
309  .template subscribe<Base::regex_group_,
311  [this](
312  std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
313  msg) { _receive_regex_data_forwarded(msg); });
314  }
316  {
317  this->unsubscribe_all();
318 
319  // TODO - remove by adding in an explicit handshake with the unsubscribe_all publication so that we don't delete ourself (and thus our inner()) before the Portal has deleted all the subscriptions
320  usleep(1e5);
321  }
322 
323  friend Base;
324 
325  private:
326  template <typename Data, int scheme>
327  void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
328  {
329  // create and forward publication to edge
330  std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
331  std::string* sbytes = new std::string(bytes.begin(), bytes.end());
332  auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
333  auto* key = msg->mutable_key();
334 
335  key->set_marshalling_scheme(scheme);
337  key->set_group(std::string(group));
338  msg->set_allocated_data(sbytes);
339 
340  *key->mutable_cfg() = publisher.cfg();
341 
342  this->inner().template publish<Base::to_portal_group_>(msg);
343  }
344 
345  void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
347  {
348  auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
349  auto* key = msg->mutable_key();
350 
351  key->set_marshalling_scheme(scheme);
352  key->set_type(type_name);
353  key->set_group(std::string(group));
354  msg->set_data(std::string(bytes.begin(), bytes.end()));
355 
356  this->inner().template publish<Base::to_portal_group_>(msg);
357  }
358 
359  template <typename Data, int scheme>
360  void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
361  const Subscriber<Data>& subscriber)
362  {
363  this->inner().template subscribe_dynamic<Data, scheme>(f, group);
364 
365  // forward subscription to edge
366  auto inner_publication_lambda = [=](std::shared_ptr<const Data> d) {
367  this->inner().template publish_dynamic<Data, scheme>(d, group);
368  };
369 
370  auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
371  inner_publication_lambda, group,
372  middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
373  [=](const Data& d) { return group; }));
374 
375  this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
376  subscription);
377  }
378 
379  template <typename Data, int scheme>
380  void _unsubscribe(const Group& group, const Subscriber<Data>& subscriber)
381  {
382  this->inner().template unsubscribe_dynamic<Data, scheme>(group, subscriber);
383 
384  auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
385  new SerializationUnSubscription<Data, scheme>(group));
386 
387  this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
388  unsubscription);
389  }
390 
391  void _unsubscribe_all()
392  {
393  regex_subscriptions_.clear();
394  auto all = std::make_shared<SerializationUnSubscribeAll>();
395  this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
396  }
397 
398  std::shared_ptr<SerializationSubscriptionRegex>
399  _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
400  const std::string& type, const Group& group)>
401  f,
402  const std::set<int>& schemes, const std::string& type_regex = ".*",
403  const std::string& group_regex = ".*")
404  {
405  auto inner_publication_lambda = [=](const std::vector<unsigned char>& data, int scheme,
406  const std::string& type, const Group& group) {
407  std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
409  forwarded_data->mutable_key()->set_marshalling_scheme(scheme);
410  forwarded_data->mutable_key()->set_type(type);
411  forwarded_data->mutable_key()->set_group(group);
412  forwarded_data->set_data(std::string(data.begin(), data.end()));
413  this->inner().template publish<Base::regex_group_>(forwarded_data);
414  };
415 
416  auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
417  inner_publication_lambda, schemes, type_regex, group_regex);
418  this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
419  portal_subscription);
420 
421  auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
422  new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
423  regex_subscriptions_.insert(local_subscription);
424  return local_subscription;
425  }
426 
427  void _receive_regex_data_forwarded(
428  std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage> msg)
429  {
430  const auto& bytes = msg->data();
431  for (auto& sub : regex_subscriptions_)
432  sub->post(bytes.begin(), bytes.end(), msg->key().marshalling_scheme(),
433  msg->key().type(), msg->key().group());
434  }
435 
436  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
437  {
438  return 0;
439  } // A forwarder is a shell, only the inner Transporter has data
440 
441  private:
442  std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
443 };
444 
445 template <typename Derived, typename InnerTransporter>
446 class InterProcessPortalBase : public InterProcessTransporterBase<Derived, InnerTransporter>
447 {
448  public:
450 
451  InterProcessPortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
452  InterProcessPortalBase() { _init(); }
453 
455 
456  private:
457  void _init()
458  {
460  this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
461  [this](std::shared_ptr<const SerializerTransporterMessage> d) {
462  static_cast<Derived*>(this)->_receive_publication_forwarded(*d);
463  });
464 
465  this->inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
466  [this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s) {
467  static_cast<Derived*>(this)->_receive_subscription_forwarded(s);
468  });
469 
470  this->inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
471  [this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s) {
472  static_cast<Derived*>(this)->_receive_regex_subscription_forwarded(s);
473  });
474 
475  this->inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
476  [this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s) {
477  static_cast<Derived*>(this)->_unsubscribe_all(s->subscriber_id());
478  });
479  }
480 };
481 
482 } // namespace middleware
483 } // namespace goby
484 
485 #endif
goby::middleware::InterProcessTransporterBase::InterProcessTransporterBase
InterProcessTransporterBase(InnerTransporter &inner)
Definition: interprocess.h:63
goby::middleware::SerializerParserHelper::parse
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition: interface.h:129
goby::middleware::StaticTransporterInterface
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition: interface.h:203
goby::middleware::protobuf::SerializerTransporterMessage
Definition: serializer_transporter.pb.h:462
interface.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::SerializerParserHelper
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:97
goby::middleware::InterProcessForwarder::~InterProcessForwarder
virtual ~InterProcessForwarder()
Definition: interprocess.h:315
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
group.h
goby::middleware::InterProcessTransporterBase::publish_serialized
void publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
Publish a message that has already been serialized for the given scheme.
Definition: interprocess.h:121
goby::middleware::Poller
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition: poller.h:37
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::middleware::MarshallingScheme::PROTOBUF
@ PROTOBUF
Definition: interface.h:53
goby::middleware::InterProcessTransporterBase::regex_group_
static constexpr Group regex_group_
Definition: interprocess.h:270
goby::middleware::InterProcessPortalBase::~InterProcessPortalBase
virtual ~InterProcessPortalBase()
Definition: interprocess.h:454
goby::middleware::InterProcessTransporterBase::subscribe_type_regex
std::shared_ptr< SerializationSubscriptionRegex > subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const Group &group, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
Definition: interprocess.h:202
goby::middleware::protobuf::TransporterConfig
Definition: transporter_config.pb.h:74
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::InterProcessTransporterBase::check_validity
void check_validity()
Check validity of the Group for interthread use (at compile time)
Definition: interprocess.h:254
goby::middleware::InterProcessTransporterBase::publish_dynamic
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
Definition: interprocess.h:79
goby::middleware::InterProcessTransporterBase::scheme
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
Definition: interprocess.h:242
null.h
goby::middleware::InterProcessForwarder::Base
friend Base
Definition: interprocess.h:323
goby::middleware::InterProcessTransporterBase
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
Definition: interprocess.h:51
goby::middleware::InterProcessPortalBase::InterProcessPortalBase
InterProcessPortalBase()
Definition: interprocess.h:452
goby::middleware::InterProcessTransporterBase::publish_dynamic
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
Definition: interprocess.h:114
goby::middleware::InterProcessTransporterBase::to_portal_group_
static constexpr Group to_portal_group_
Definition: interprocess.h:269
goby::middleware::InterProcessTransporterBase::subscribe_dynamic
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
Definition: interprocess.h:152
goby::middleware::Publisher::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
goby::middleware::InterProcessForwarder::Base
InterProcessTransporterBase< InterProcessForwarder< InnerTransporter >, InnerTransporter > Base
Definition: interprocess.h:301
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::middleware::InnerTransporterInterface< InterProcessTransporterBase< Derived, InnerTransporter >, InnerTransporter >::inner
InnerTransporter & inner()
Definition: interface.h:63
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::middleware::InterProcessTransporterBase::unsubscribe_all
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Definition: interprocess.h:173
goby::middleware::InterProcessTransporterBase::subscribe_dynamic
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
Definition: interprocess.h:136
goby::middleware::InterProcessTransporterBase::InterProcessTransporterBase
InterProcessTransporterBase()
Definition: interprocess.h:67
goby::middleware::InterProcessTransporterBase::unsubscribe_dynamic
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
Definition: interprocess.h:165
goby::middleware::InterProcessPortalBase
Definition: interprocess.h:446
goby::middleware::MarshallingScheme::DCCL
@ DCCL
Definition: interface.h:54
goby::middleware::InterProcessTransporterBase::publish_dynamic
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
Definition: interprocess.h:95
goby::middleware::InterProcessForwarder::InterProcessForwarder
InterProcessForwarder(InnerTransporter &inner)
Construct a forwarder for the interprocess layer.
Definition: interprocess.h:306
goby::middleware::scheme
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
goby::middleware::InterProcessTransporterBase::check_validity_runtime
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
Definition: interprocess.h:262
serialization_handlers.h
goby::middleware::InterProcessTransporterBase::subscribe_type_regex
void subscribe_type_regex(std::function< void(std::shared_ptr< const Data >, const std::string &type)> f, const std::string &type_regex=".*")
Subscribe to a number of types within a given group and scheme using a regular expression.
Definition: interprocess.h:231
poller.h
goby::middleware::InterProcessPortalBase::InterProcessPortalBase
InterProcessPortalBase(InnerTransporter &inner)
Definition: interprocess.h:451
goby::middleware::InterProcessTransporterBase::from_portal_group_
static constexpr Group from_portal_group_
Definition: interprocess.h:271
goby::middleware::InterProcessTransporterBase::subscribe_regex
std::shared_ptr< SerializationSubscriptionRegex > subscribe_regex(std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> f, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
Subscribe to multiple groups and/or types at once using regular expressions.
Definition: interprocess.h:183
goby::middleware::InterProcessTransporterBase::~InterProcessTransporterBase
virtual ~InterProcessTransporterBase()
Definition: interprocess.h:69