Goby3  3.1.4
2024.02.22
intermodule.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
26 
27 #include <atomic>
28 #include <functional>
29 #include <sys/types.h>
30 #include <thread>
31 #include <tuple>
32 #include <unistd.h>
33 
34 #include "goby/middleware/group.h"
35 
41 
42 namespace goby
43 {
44 namespace middleware
45 {
46 namespace protobuf
47 {
49 {
50  return k1.marshalling_scheme() != k2.marshalling_scheme()
52  : (k1.type() != k2.type()
53  ? (k1.type() < k2.type())
54  : (k1.group() != k2.group() ? (k1.group() < k2.group()) : false));
55 }
56 } // namespace protobuf
57 
58 template <typename Derived, typename InnerTransporter>
60 
65 template <typename InnerTransporter>
67  : public InterModuleTransporterBase<InterModuleForwarder<InnerTransporter>, InnerTransporter>
68 {
69  public:
70  using Base =
72 
76  InterModuleForwarder(InnerTransporter& inner) : Base(inner) {}
77  virtual ~InterModuleForwarder() { this->unsubscribe_all(); }
78 
79  friend Base;
80 
81  private:
82  template <typename Data, int scheme>
83  void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
84  {
85  //create and forward publication to edge
86 
87  std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
88  std::string* sbytes = new std::string(bytes.begin(), bytes.end());
90  auto* key = msg.mutable_key();
91 
92  key->set_marshalling_scheme(scheme);
94  key->set_group(std::string(group));
95  msg.set_allocated_data(sbytes);
96 
97  *key->mutable_cfg() = publisher.cfg();
98  this->inner().template publish<Base::to_portal_group_>(msg);
99  }
100 
101  template <typename Data, int scheme>
102  void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
103  const Subscriber<Data>& subscriber)
104  {
105  if (subscriptions_.empty())
106  this->inner()
110  auto range = subscriptions_.equal_range(msg.key());
111  for (auto it = range.first; it != range.second; ++it)
112  { it->second->post(msg.data().begin(), msg.data().end()); }
113  });
114 
115  auto local_subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
116  f, group,
117  middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
118  [=](const Data& d) { return group; }));
119 
121  Subscription subscription;
122  subscription.set_id(full_process_and_thread_id());
123  subscription.mutable_key()->set_marshalling_scheme(scheme);
124  subscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
125  subscription.mutable_key()->set_group(std::string(group));
126  subscription.set_action(Subscription::SUBSCRIBE);
127 
128  this->inner().template publish<Base::to_portal_group_>(subscription);
129 
130  subscriptions_.insert(std::make_pair(subscription.key(), local_subscription));
131  }
132 
133  template <typename Data, int scheme> void _unsubscribe(const Group& group)
134  {
136  Subscription unsubscription;
137  unsubscription.set_id(full_process_and_thread_id());
138  unsubscription.mutable_key()->set_marshalling_scheme(scheme);
139  unsubscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
140  unsubscription.mutable_key()->set_group(std::string(group));
141  unsubscription.set_action(Subscription::UNSUBSCRIBE);
142  this->inner().template publish<Base::to_portal_group_>(unsubscription);
143 
144  subscriptions_.erase(unsubscription.key());
145 
146  if (subscriptions_.empty())
147  this->inner()
149  protobuf::SerializerTransporterMessage>();
150  }
151 
152  void _unsubscribe_all()
153  {
155  Subscription unsubscription;
156  unsubscription.set_id(full_process_and_thread_id());
157  unsubscription.set_action(Subscription::UNSUBSCRIBE_ALL);
158  this->inner().template publish<Base::to_portal_group_>(unsubscription);
159 
160  subscriptions_.clear();
161  this->inner()
163  protobuf::SerializerTransporterMessage>();
164  }
165 
166  // not yet implemented
167  // void _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
168  // const std::string& type, const Group& group)>
169  // f,
170  // const std::set<int>& schemes, const std::string& type_regex = ".*",
171  // const std::string& group_regex = ".*")
172  // {
173  // }
174 
175  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
176  {
177  return 0;
178  } // A forwarder is a shell, only the inner Transporter has data
179 
180  private:
181  std::multimap<protobuf::SerializerTransporterKey,
182  std::shared_ptr<const middleware::SerializationHandlerBase<>>>
183  subscriptions_;
184 };
185 
186 template <typename Derived, typename InnerTransporter>
187 class InterModulePortalBase : public InterModuleTransporterBase<Derived, InnerTransporter>
188 {
189  public:
191 
192  InterModulePortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
193  InterModulePortalBase() { _init(); }
194 
196 
197  private:
198  void _init()
199  {
202  this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
203  [this](const SerializerTransporterMessage& d) {
204  static_cast<Derived*>(this)->_receive_publication_forwarded(d);
205  });
206 
207  this->inner().template subscribe<Base::to_portal_group_, Subscription>(
208  [this](const Subscription& s) {
209  auto on_subscribe = [this](const SerializerTransporterMessage& d) {
210  this->inner().template publish<Base::from_portal_group_>(d);
211  };
212  auto sub = std::make_shared<SerializationInterModuleSubscription>(on_subscribe, s);
213 
214  switch (s.action())
215  {
216  case Subscription::SUBSCRIBE:
217  case Subscription::UNSUBSCRIBE:
218  static_cast<Derived*>(this)->_receive_subscription_forwarded(sub);
219  break;
220  case Subscription::UNSUBSCRIBE_ALL:
221  static_cast<Derived*>(this)->_unsubscribe_all(s.id());
222  break;
223  }
224  });
225  }
226 };
227 
228 } // namespace middleware
229 } // namespace goby
230 
231 #endif
goby::middleware::InterModuleForwarder::Base
friend Base
Definition: intermodule.h:79
goby::middleware::intermodule::protobuf::Subscription::set_id
void set_id(const ::std::string &value)
Definition: intermodule.pb.h:304
goby::middleware::protobuf::SerializerTransporterMessage
Definition: serializer_transporter.pb.h:462
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::SerializerParserHelper
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:97
goby::middleware::StaticTransporterInterface< InterProcessTransporterBase< Derived, InnerTransporter >, InnerTransporter >::subscribe
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition: interface.h:270
goby::middleware::InterModuleForwarder::~InterModuleForwarder
virtual ~InterModuleForwarder()
Definition: intermodule.h:77
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
group.h
goby::middleware::protobuf::SerializerTransporterKey::type
const ::std::string & type() const
Definition: serializer_transporter.pb.h:901
goby::middleware::intermodule::protobuf::Subscription
Definition: intermodule.pb.h:99
goby::middleware::protobuf::SerializerTransporterKey
Definition: serializer_transporter.pb.h:244
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::middleware::InterModuleForwarder
Implements the forwarder concept for the intermodule layer.
Definition: intermodule.h:66
goby::middleware::InterModulePortalBase::InterModulePortalBase
InterModulePortalBase(InnerTransporter &inner)
Definition: intermodule.h:192
goby::middleware::protobuf::TransporterConfig
Definition: transporter_config.pb.h:74
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::InterModulePortalBase::InterModulePortalBase
InterModulePortalBase()
Definition: intermodule.h:193
goby::middleware::full_process_and_thread_id
std::string full_process_and_thread_id(std::thread::id i=std::this_thread::get_id())
Definition: common.h:104
goby::middleware::protobuf::SerializerTransporterKey::group
const ::std::string & group() const
Definition: serializer_transporter.pb.h:967
goby::middleware::protobuf::operator<
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
Definition: tcp_server_interface.h:60
goby::middleware::InterProcessTransporterBase::scheme
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
Definition: interprocess.h:242
null.h
goby::middleware::SerializerParserHelper::type_name
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition: interface.h:107
goby::middleware::InterProcessTransporterBase
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
Definition: interprocess.h:51
goby::middleware::Publisher::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
interprocess.h
goby::middleware::InterModulePortalBase::~InterModulePortalBase
virtual ~InterModulePortalBase()
Definition: intermodule.h:195
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::middleware::protobuf::SerializerTransporterKey::marshalling_scheme
::google::protobuf::int32 marshalling_scheme() const
Definition: serializer_transporter.pb.h:877
goby::middleware::InnerTransporterInterface< InterProcessTransporterBase< Derived, InnerTransporter >, InnerTransporter >::inner
InnerTransporter & inner()
Definition: interface.h:63
goby::middleware::StaticTransporterInterface< InterProcessTransporterBase< Derived, InnerTransporter >, InnerTransporter >::unsubscribe
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition: interface.h:323
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::middleware::InterProcessTransporterBase::unsubscribe_all
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Definition: interprocess.h:173
goby::middleware::InterModulePortalBase
Definition: intermodule.h:187
serialization_handlers.h
poller.h
goby::middleware::InterProcessTransporterBase::from_portal_group_
static constexpr Group from_portal_group_
Definition: interprocess.h:271
goby::middleware::InterModuleForwarder::InterModuleForwarder
InterModuleForwarder(InnerTransporter &inner)
Construct a forwarder for the intermodule layer.
Definition: intermodule.h:76
intermodule.pb.h