Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
intermodule.h
Go to the documentation of this file.
1// Copyright 2016-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
26
27#include <atomic>
28#include <functional>
29#include <sys/types.h>
30#include <thread>
31#include <tuple>
32#include <unistd.h>
33
35
41
42namespace goby
43{
44namespace middleware
45{
46namespace protobuf
47{
49{
50 return k1.marshalling_scheme() != k2.marshalling_scheme()
52 : (k1.type() != k2.type()
53 ? (k1.type() < k2.type())
54 : (k1.group() != k2.group() ? (k1.group() < k2.group()) : false));
55}
56} // namespace protobuf
57
58template <typename Derived, typename InnerTransporter>
60
65template <typename InnerTransporter>
67 : public InterModuleTransporterBase<InterModuleForwarder<InnerTransporter>, InnerTransporter>
68{
69 public:
70 using Base =
72
76 InterModuleForwarder(InnerTransporter& inner) : Base(inner) {}
78
79 friend Base;
80
81 private:
82 template <typename Data, int scheme>
83 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
84 {
85 //create and forward publication to edge
86
87 std::vector<char> bytes(SerializerParserHelper<Data, scheme>::serialize(d));
88 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
90 auto* key = msg.mutable_key();
91
92 key->set_marshalling_scheme(scheme);
94 key->set_group(std::string(group));
95 msg.set_allocated_data(sbytes);
96
97 *key->mutable_cfg() = publisher.cfg();
98 this->inner().template publish<Base::to_portal_group_>(msg);
99 }
100
101 template <typename Data, int scheme>
102 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f, const Group& group,
103 const Subscriber<Data>& subscriber)
104 {
105 if (subscriptions_.empty())
106 this->inner()
110 auto range = subscriptions_.equal_range(msg.key());
111 for (auto it = range.first; it != range.second; ++it)
112 { it->second->post(msg.data().begin(), msg.data().end()); }
113 });
114
115 auto local_subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
116 f, group,
117 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
118 [=](const Data& d) { return group; }));
119
121 Subscription subscription;
122 subscription.set_id(full_process_and_thread_id());
123 subscription.mutable_key()->set_marshalling_scheme(scheme);
124 subscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
125 subscription.mutable_key()->set_group(std::string(group));
126 subscription.set_action(Subscription::SUBSCRIBE);
127
128 this->inner().template publish<Base::to_portal_group_>(subscription);
129
130 subscriptions_.insert(std::make_pair(subscription.key(), local_subscription));
131 }
132
133 template <typename Data, int scheme> void _unsubscribe(const Group& group)
134 {
136 Subscription unsubscription;
137 unsubscription.set_id(full_process_and_thread_id());
138 unsubscription.mutable_key()->set_marshalling_scheme(scheme);
139 unsubscription.mutable_key()->set_type(SerializerParserHelper<Data, scheme>::type_name());
140 unsubscription.mutable_key()->set_group(std::string(group));
141 unsubscription.set_action(Subscription::UNSUBSCRIBE);
142 this->inner().template publish<Base::to_portal_group_>(unsubscription);
143
144 subscriptions_.erase(unsubscription.key());
145
146 if (subscriptions_.empty())
147 this->inner()
149 protobuf::SerializerTransporterMessage>();
150 }
151
152 void _unsubscribe_all()
153 {
155 Subscription unsubscription;
156 unsubscription.set_id(full_process_and_thread_id());
157 unsubscription.set_action(Subscription::UNSUBSCRIBE_ALL);
158 this->inner().template publish<Base::to_portal_group_>(unsubscription);
159
160 subscriptions_.clear();
161 this->inner()
163 protobuf::SerializerTransporterMessage>();
164 }
165
166 // not yet implemented
167 // void _subscribe_regex(std::function<void(const std::vector<unsigned char>&, int scheme,
168 // const std::string& type, const Group& group)>
169 // f,
170 // const std::set<int>& schemes, const std::string& type_regex = ".*",
171 // const std::string& group_regex = ".*")
172 // {
173 // }
174
175 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
176 {
177 return 0;
178 } // A forwarder is a shell, only the inner Transporter has data
179
180 private:
181 std::multimap<protobuf::SerializerTransporterKey,
182 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
183 subscriptions_;
184};
185
186template <typename Derived, typename InnerTransporter>
187class InterModulePortalBase : public InterModuleTransporterBase<Derived, InnerTransporter>
188{
189 public:
191
192 InterModulePortalBase(InnerTransporter& inner) : Base(inner) { _init(); }
194
196
197 private:
198 void _init()
199 {
202 this->inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
203 [this](const SerializerTransporterMessage& d) {
204 static_cast<Derived*>(this)->_receive_publication_forwarded(d);
205 });
206
207 this->inner().template subscribe<Base::to_portal_group_, Subscription>(
208 [this](const Subscription& s) {
209 auto on_subscribe = [this](const SerializerTransporterMessage& d) {
210 this->inner().template publish<Base::from_portal_group_>(d);
211 };
212 auto sub = std::make_shared<SerializationInterModuleSubscription>(on_subscribe, s);
213
214 switch (s.action())
215 {
216 case Subscription::SUBSCRIBE:
217 case Subscription::UNSUBSCRIBE:
218 static_cast<Derived*>(this)->_receive_subscription_forwarded(sub);
219 break;
220 case Subscription::UNSUBSCRIBE_ALL:
221 static_cast<Derived*>(this)->_unsubscribe_all(s.id());
222 break;
223 }
224 });
225 }
226};
227
228} // namespace middleware
229} // namespace goby
230
231#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Implements the forwarder concept for the intermodule layer.
Definition intermodule.h:68
InterModuleForwarder(InnerTransporter &inner)
Construct a forwarder for the intermodule layer.
Definition intermodule.h:76
InterModulePortalBase(InnerTransporter &inner)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:270
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition interface.h:323
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
std::string full_process_and_thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:104
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107