Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
intermodule.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
27
28#include <atomic>
29#include <functional>
30#include <sys/types.h>
31#include <thread>
32#include <tuple>
33#include <unistd.h>
34
36
42
43namespace goby
44{
45namespace middleware
46{
47namespace protobuf
48{
50{
51 return k1.marshalling_scheme() != k2.marshalling_scheme()
53 : (k1.type() != k2.type()
54 ? (k1.type() < k2.type())
55 : (k1.group() != k2.group() ? (k1.group() < k2.group()) : false));
56}
57} // namespace protobuf
58
59template <typename Derived, typename InnerTransporter, typename ImplementationTag>
62
68template <typename InnerTransporter, typename ImplementationTag = void> class InterModuleForwarder;
69
74template <typename InnerTransporter, typename ImplementationTag>
76 : public InterModuleTransporterBase<InterModuleForwarder<InnerTransporter, ImplementationTag>,
77 InnerTransporter, ImplementationTag>
78{
79 public:
80 using Base =
82 InnerTransporter, ImplementationTag>;
83
87 InterModuleForwarder(InnerTransporter& inner) : Base(inner) {}
89
90 friend Base;
91
92 private:
93 template <typename Data, int scheme>
94 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
95 {
96 //create and forward publication to edge
97
98 std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
99 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
101 auto* key = msg.mutable_key();
102
103 key->set_marshalling_scheme(scheme);
105 key->set_group(std::string(group));
106 msg.set_allocated_data(sbytes);
107
108 *key->mutable_cfg() = publisher.cfg();
109 this->inner().template publish<Base::to_portal_group_>(msg);
110 }
111
112 template <typename Data, int scheme>
113 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
114 const Subscriber<Data>& subscriber)
115 {
116 if (subscriptions_.empty())
117 this->inner()
121 {
122 auto range = subscriptions_.equal_range(msg.key());
123 for (auto it = range.first; it != range.second; ++it)
124 {
125 it->second->post(msg.data().begin(), msg.data().end());
126 }
127 });
128
129 auto local_subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
130 f, group,
131 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
132 [=](const Data& d) { return group; }));
133
135 Subscription subscription;
136 subscription.set_id(full_process_and_thread_id());
137 subscription.mutable_key()->set_marshalling_scheme(scheme);
138 subscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
139 subscription.mutable_key()->set_group(std::string(group));
140 subscription.set_action(Subscription::SUBSCRIBE);
141
142 this->inner().template publish<Base::to_portal_group_>(subscription);
143
144 subscriptions_.insert(std::make_pair(subscription.key(), local_subscription));
145 }
146
147 template <typename Data, int scheme> void _unsubscribe(const Group& group)
148 {
150 Subscription unsubscription;
151 unsubscription.set_id(full_process_and_thread_id());
152 unsubscription.mutable_key()->set_marshalling_scheme(scheme);
153 unsubscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
154 unsubscription.mutable_key()->set_group(std::string(group));
155 unsubscription.set_action(Subscription::UNSUBSCRIBE);
156 this->inner().template publish<Base::to_portal_group_>(unsubscription);
157
158 subscriptions_.erase(unsubscription.key());
159
160 if (subscriptions_.empty())
161 this->inner()
163 protobuf::SerializerTransporterMessage>();
164 }
165
166 void _unsubscribe_all()
167 {
169 Subscription unsubscription;
170 unsubscription.set_id(full_process_and_thread_id());
171 unsubscription.set_action(Subscription::UNSUBSCRIBE_ALL);
172 this->inner().template publish<Base::to_portal_group_>(unsubscription);
173
174 subscriptions_.clear();
175 this->inner()
177 protobuf::SerializerTransporterMessage>();
178 }
179
180 // not yet implemented
181 // void _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
182 // const std::string& type, const Group& group)>
183 // f,
184 // const std::set<int>& schemes, const std::string& type_regex = ".*",
185 // const std::string& group_regex = ".*")
186 // {
187 // }
188
189 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
190 {
191 return 0;
192 } // A forwarder is a shell, only the inner Transporter has data
193
194 private:
195 std::multimap<protobuf::SerializerTransporterKey,
196 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
197 subscriptions_;
198};
199
200template <typename Derived, typename InnerTransporter, typename ImplementationTag>
202 : public InterModuleTransporterBase<Derived, InnerTransporter, ImplementationTag>,
203 public InterProcessPortalCommon<Derived, InnerTransporter>
204{
205 public:
208
209 InterModulePortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
211
213
214 private:
215 void _init()
216 {
219 this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
220 [this](const SerializerTransporterMessage& d)
221 {
222 std::vector<char> data(d.data().begin(), d.data().end());
223 static_cast<Derived*>(this)->_publish_serialized(
224 d.key().type(), d.key().marshalling_scheme(), data,
225 goby::middleware::DynamicGroup(d.key().group()));
226 });
227
228 this->inner().template subscribe<Base::to_portal_group_, Subscription>(
229 [this](const Subscription& s)
230 {
231 auto on_subscribe = [this](const SerializerTransporterMessage& d)
232 { this->inner().template publish<Base::from_portal_group_>(d); };
233 auto sub = std::make_shared<SerializationInterModuleSubscription>(on_subscribe, s);
234
235 switch (s.action())
236 {
237 case Subscription::SUBSCRIBE:
238 case Subscription::UNSUBSCRIBE:
239 static_cast<Derived*>(this)->_receive_subscription_forwarded(sub);
240 break;
241 case Subscription::UNSUBSCRIBE_ALL:
242 static_cast<Derived*>(this)->_unsubscribe_all(s.id());
243 break;
244 }
245 });
246 }
247};
248
249} // namespace middleware
250} // namespace goby
251
253
254namespace goby
255{
256namespace middleware
257{
263template <typename InnerTransporter>
264class InterModuleForwarder<InnerTransporter, void>
265 : public InterModuleForwarder<InnerTransporter, zeromq::detail::InterModuleTag>
266{
267 public:
268 using Base = InterModuleForwarder<InnerTransporter, zeromq::detail::InterModuleTag>;
269
270 [[deprecated("Use zeromq::InterModuleForwarder<> or udpm::InterModuleForwarder<> instead of "
271 "middleware::InterModuleForwarder<>")]]
272 explicit InterModuleForwarder(InnerTransporter& inner)
273 : Base(inner)
274 {
275 }
276};
277} // namespace middleware
278} // namespace goby
279
280#endif
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition group.h:121
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
Implements the forwarder concept for the intermodule layer.
Definition intermodule.h:78
InterModuleTransporterBase< InterModuleForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
Definition intermodule.h:82
InterModuleForwarder(InnerTransporter &inner)
Construct a forwarder for the intermodule layer.
Definition intermodule.h:87
InterModulePortalBase(InnerTransporter &inner)
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:301
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition interface.h:354
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
std::string full_process_and_thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:104
middleware::InterModuleForwarder< InnerTransporter, detail::InterModuleTag > InterModuleForwarder
Definition intermodule.h:43
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107