25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
59template <
typename Derived,
typename InnerTransporter,
typename ImplementationTag>
68template <
typename InnerTransporter,
typename ImplementationTag =
void>
class InterModuleForwarder;
74template <
typename InnerTransporter,
typename ImplementationTag>
77 InnerTransporter, ImplementationTag>
82 InnerTransporter, ImplementationTag>;
93 template <
typename Data,
int scheme>
99 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
101 auto* key =
msg.mutable_key();
103 key->set_marshalling_scheme(
scheme);
105 key->set_group(std::string(
group));
106 msg.set_allocated_data(sbytes);
108 *key->mutable_cfg() = publisher.
cfg();
109 this->
inner().template publish<Base::to_portal_group_>(
msg);
112 template <
typename Data,
int scheme>
113 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
const Group&
group,
116 if (subscriptions_.empty())
122 auto range = subscriptions_.equal_range(
msg.key());
123 for (
auto it = range.first; it != range.second; ++it)
125 it->second->post(
msg.data().begin(),
msg.data().end());
129 auto local_subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
132 [=](
const Data& d) {
return group; }));
135 Subscription subscription;
137 subscription.mutable_key()->set_marshalling_scheme(
scheme);
139 subscription.mutable_key()->set_group(std::string(
group));
140 subscription.set_action(Subscription::SUBSCRIBE);
142 this->
inner().template publish<Base::to_portal_group_>(subscription);
144 subscriptions_.insert(std::make_pair(subscription.key(), local_subscription));
147 template <
typename Data,
int scheme>
void _unsubscribe(
const Group&
group)
150 Subscription unsubscription;
152 unsubscription.mutable_key()->set_marshalling_scheme(
scheme);
154 unsubscription.mutable_key()->set_group(std::string(
group));
155 unsubscription.set_action(Subscription::UNSUBSCRIBE);
156 this->
inner().template publish<Base::to_portal_group_>(unsubscription);
158 subscriptions_.erase(unsubscription.key());
160 if (subscriptions_.empty())
163 protobuf::SerializerTransporterMessage>();
166 void _unsubscribe_all()
169 Subscription unsubscription;
171 unsubscription.set_action(Subscription::UNSUBSCRIBE_ALL);
172 this->
inner().template publish<Base::to_portal_group_>(unsubscription);
174 subscriptions_.clear();
177 protobuf::SerializerTransporterMessage>();
189 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
195 std::multimap<protobuf::SerializerTransporterKey,
196 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
200template <
typename Derived,
typename InnerTransporter,
typename ImplementationTag>
219 this->
inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
220 [
this](
const SerializerTransporterMessage& d)
222 std::vector<char> data(d.data().begin(), d.data().end());
224 d.key().type(), d.key().marshalling_scheme(), data,
228 this->
inner().template subscribe<Base::to_portal_group_, Subscription>(
229 [
this](
const Subscription& s)
231 auto on_subscribe = [
this](
const SerializerTransporterMessage& d)
232 { this->
inner().template publish<Base::from_portal_group_>(d); };
233 auto sub = std::make_shared<SerializationInterModuleSubscription>(on_subscribe, s);
237 case Subscription::SUBSCRIBE:
238 case Subscription::UNSUBSCRIBE:
241 case Subscription::UNSUBSCRIBE_ALL:
263template <
typename InnerTransporter>
268 using Base = InterModuleForwarder<InnerTransporter, zeromq::detail::InterModuleTag>;
270 [[deprecated(
"Use zeromq::InterModuleForwarder<> or udpm::InterModuleForwarder<> instead of "
271 "middleware::InterModuleForwarder<>")]]
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporter & inner()
Implements the forwarder concept for the intermodule layer.
InterModuleTransporterBase< InterModuleForwarder< InnerTransporter, ImplementationTag >, InnerTransporter, ImplementationTag > Base
InterModuleForwarder(InnerTransporter &inner)
Construct a forwarder for the intermodule layer.
virtual ~InterModuleForwarder()
InterModulePortalBase(InnerTransporter &inner)
virtual ~InterModulePortalBase()
void _publish_serialized(std::string type_name, int scheme, const std::vector< char > &bytes, const goby::middleware::Group &group)
void _unsubscribe_all(const std::string &subscriber_id=middleware::identifier_part_to_string(std::this_thread::get_id()))
void _receive_subscription_forwarded(const std::shared_ptr< const middleware::SerializationHandlerBase<> > &subscription)
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
static constexpr Group from_portal_group_
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
void set_id(ArgT0 &&arg0, ArgT... args)
const std::string & type() const
int32_t marshalling_scheme() const
const std::string & group() const
goby::util::logger::GroupSetter group(std::string n)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
std::string full_process_and_thread_id(std::thread::id i=std::this_thread::get_id())
middleware::InterModuleForwarder< InnerTransporter, detail::InterModuleTag > InterModuleForwarder
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.