Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
intermodule.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
26
27#include <atomic>
28#include <functional>
29#include <sys/types.h>
30#include <thread>
31#include <tuple>
32#include <unistd.h>
33
35
41
42namespace goby
43{
44namespace middleware
45{
46namespace protobuf
47{
49{
50 return k1.marshalling_scheme() != k2.marshalling_scheme()
52 : (k1.type() != k2.type()
53 ? (k1.type() < k2.type())
54 : (k1.group() != k2.group() ? (k1.group() < k2.group()) : false));
55}
56} // namespace protobuf
57
58template <typename Derived, typename InnerTransporter, typename ImplementationTag>
61
67template <typename InnerTransporter, typename ImplementationTag = void> class InterModuleForwarder;
68
73template <typename InnerTransporter, typename ImplementationTag>
75 : public InterModuleTransporterBase<InterModuleForwarder<InnerTransporter, ImplementationTag>,
76 InnerTransporter, ImplementationTag>
77{
78 public:
79 using Base =
81 InnerTransporter, ImplementationTag>;
82
86 InterModuleForwarder(InnerTransporter& inner) : Base(inner) {}
88
89 friend Base;
90
91 private:
92 template <typename Data, int scheme>
93 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
94 {
95 //create and forward publication to edge
96
97 std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
98 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
100 auto* key = msg.mutable_key();
101
102 key->set_marshalling_scheme(scheme);
104 key->set_group(std::string(group));
105 msg.set_allocated_data(sbytes);
106
107 *key->mutable_cfg() = publisher.cfg();
108 this->inner().template publish<Base::to_portal_group_>(msg);
109 }
110
111 template <typename Data, int scheme>
112 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
113 const Subscriber<Data>& subscriber)
114 {
115 if (subscriptions_.empty())
116 this->inner()
120 {
121 auto range = subscriptions_.equal_range(msg.key());
122 for (auto it = range.first; it != range.second; ++it)
123 {
124 it->second->post(msg.data().begin(), msg.data().end());
125 }
126 });
127
128 auto local_subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
129 f, group,
130 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
131 [=](const Data& d) { return group; }));
132
134 Subscription subscription;
135 subscription.set_id(full_process_and_thread_id());
136 subscription.mutable_key()->set_marshalling_scheme(scheme);
137 subscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
138 subscription.mutable_key()->set_group(std::string(group));
139 subscription.set_action(Subscription::SUBSCRIBE);
140
141 this->inner().template publish<Base::to_portal_group_>(subscription);
142
143 subscriptions_.insert(std::make_pair(subscription.key(), local_subscription));
144 }
145
146 template <typename Data, int scheme> void _unsubscribe(const Group& group)
147 {
149 Subscription unsubscription;
150 unsubscription.set_id(full_process_and_thread_id());
151 unsubscription.mutable_key()->set_marshalling_scheme(scheme);
152 unsubscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
153 unsubscription.mutable_key()->set_group(std::string(group));
154 unsubscription.set_action(Subscription::UNSUBSCRIBE);
155 this->inner().template publish<Base::to_portal_group_>(unsubscription);
156
157 subscriptions_.erase(unsubscription.key());
158
159 if (subscriptions_.empty())
160 this->inner()
162 protobuf::SerializerTransporterMessage>();
163 }
164
165 void _unsubscribe_all()
166 {
168 Subscription unsubscription;
169 unsubscription.set_id(full_process_and_thread_id());
170 unsubscription.set_action(Subscription::UNSUBSCRIBE_ALL);
171 this->inner().template publish<Base::to_portal_group_>(unsubscription);
172
173 subscriptions_.clear();
174 this->inner()
176 protobuf::SerializerTransporterMessage>();
177 }
178
179 // not yet implemented
180 // void _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
181 // const std::string& type, const Group& group)>
182 // f,
183 // const std::set<int>& schemes, const std::string& type_regex = ".*",
184 // const std::string& group_regex = ".*")
185 // {
186 // }
187
188 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
189 {
190 return 0;
191 } // A forwarder is a shell, only the inner Transporter has data
192
193 private:
194 std::multimap<protobuf::SerializerTransporterKey,
195 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
196 subscriptions_;
197};
198
199template <typename Derived, typename InnerTransporter, typename ImplementationTag>
201 : public InterModuleTransporterBase<Derived, InnerTransporter, ImplementationTag>,
202 public InterProcessPortalCommon<Derived, InnerTransporter>
203{
204 public:
207
208 InterModulePortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
210
212
213 private:
214 void _init()
215 {
218 this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
219 [this](const SerializerTransporterMessage& d)
220 {
221 std::vector<char> data(d.data().begin(), d.data().end());
222 static_cast<Derived*>(this)->_publish_serialized(
223 d.key().type(), d.key().marshalling_scheme(), data,
224 goby::middleware::DynamicGroup(d.key().group()));
225 });
226
227 this->inner().template subscribe<Base::to_portal_group_, Subscription>(
228 [this](const Subscription& s)
229 {
230 auto on_subscribe = [this](const SerializerTransporterMessage& d)
231 { this->inner().template publish<Base::from_portal_group_>(d); };
232 auto sub = std::make_shared<SerializationInterModuleSubscription>(on_subscribe, s);
233
234 switch (s.action())
235 {
236 case Subscription::SUBSCRIBE:
237 case Subscription::UNSUBSCRIBE:
238 static_cast<Derived*>(this)->_receive_subscription_forwarded(sub);
239 break;
240 case Subscription::UNSUBSCRIBE_ALL:
241 static_cast<Derived*>(this)->_unsubscribe_all(s.id());
242 break;
243 }
244 });
245 }
246};
247
248} // namespace middleware
249} // namespace goby
250
252
253namespace goby
254{
255namespace middleware
256{
262template <typename InnerTransporter>
263class InterModuleForwarder<InnerTransporter, void>
264 : public InterModuleForwarder<InnerTransporter, zeromq::detail::InterModuleTag>
265{
266 public:
267 using Base = InterModuleForwarder<InnerTransporter, zeromq::detail::InterModuleTag>;
268
269 [[deprecated("Use zeromq::InterModuleForwarder<> or udpm::InterModuleForwarder<> instead of "
270 "middleware::InterModuleForwarder<>")]]
271 explicit InterModuleForwarder(InnerTransporter& inner)
272 : Base(inner)
273 {
274 }
275};
276} // namespace middleware
277} // namespace goby
278
279#endif
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition group.h:120
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Implements the forwarder concept for the intermodule layer.
Definition intermodule.h:77
InterModuleTransporterBase< InterModuleForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
Definition intermodule.h:81
InterModuleForwarder(InnerTransporter &inner)
Construct a forwarder for the intermodule layer.
Definition intermodule.h:86
InterModulePortalBase(InnerTransporter &inner)
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:300
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition interface.h:353
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
std::string full_process_and_thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:104
middleware::InterModuleForwarder< InnerTransporter, detail::InterModuleTag > InterModuleForwarder
Definition intermodule.h:43
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107