25 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26 #define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
30 #include <sys/types.h>
50 template <
typename Derived,
typename InnerTransporter>
54 public Poller<InterProcessTransporterBase<Derived, InnerTransporter>>
78 template <typename Data, int scheme = scheme<Data>()>
83 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(data,
group, publisher);
84 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
94 template <typename Data, int scheme = scheme<Data>()>
101 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(*data,
group, publisher);
102 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
113 template <typename Data, int scheme = scheme<Data>()>
117 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
125 static_cast<Derived*
>(
this)->_publish_serialized(type_name,
scheme, bytes,
group);
135 template <typename Data, int scheme = scheme<Data>()>
140 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(
141 [=](std::shared_ptr<const Data> d) { f(*d); },
group, subscriber);
151 template <typename Data, int scheme = scheme<Data>()>
156 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(f,
group, subscriber);
164 template <typename Data, int scheme = scheme<Data>()>
169 static_cast<Derived*
>(
this)->
template _unsubscribe<Data, scheme>(
group, subscriber);
182 std::shared_ptr<SerializationSubscriptionRegex>
186 const std::set<int>& schemes,
const std::string& type_regex =
".*",
187 const std::string& group_regex =
".*")
189 return static_cast<Derived*
>(
this)->_subscribe_regex(f, schemes, type_regex, group_regex);
201 template <typename Data, int scheme = scheme<Data>()>
203 std::function<
void(std::shared_ptr<const Data>,
const std::string&
type)> f,
204 const Group&
group,
const std::string& type_regex =
".*")
206 std::regex special_chars{R
"([-[\]{}()*+?.,\^$|#\s])"};
207 std::string sanitized_group =
208 std::regex_replace(std::string(group), special_chars, R"(\$&)");
210 auto regex_lambda = [=](
const std::vector<unsigned char>& data,
int schm,
211 const std::string&
type,
const Group& grp) {
212 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
218 return static_cast<Derived*
>(
this)->_subscribe_regex(regex_lambda, {
scheme}, type_regex,
219 "^" + sanitized_group +
"$");
230 template <const Group& group, typename Data, int scheme = scheme<Data>()>
232 std::function<
void(std::shared_ptr<const Data>,
const std::string&
type)> f,
233 const std::string& type_regex =
".*")
242 template <
typename Data>
static constexpr
int scheme()
244 int scheme = goby::middleware::scheme<Data>();
256 static_assert((
group.c_str() !=
nullptr) && (
group.c_str()[0] !=
'\0'),
257 "goby::middleware::Group must have non-zero length string to publish on the "
258 "InterProcess layer");
264 if ((
group.c_str() ==
nullptr) || (
group.c_str()[0] ==
'\0'))
265 throw(
goby::Exception(
"Group must have a non-empty string for use on InterProcess"));
275 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
277 return static_cast<Derived*
>(
this)->_poll(
lock);
281 template <
typename Derived,
typename InnerTransporter>
284 template <
typename Derived,
typename InnerTransporter>
287 template <
typename Derived,
typename InnerTransporter>
295 template <
typename InnerTransporter>
296 class InterProcessForwarder
297 :
public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter>, InnerTransporter>
312 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
313 msg) { _receive_regex_data_forwarded(
msg); });
317 this->unsubscribe_all();
326 template <
typename Data,
int scheme>
331 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
332 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
333 auto* key =
msg->mutable_key();
335 key->set_marshalling_scheme(
scheme);
337 key->set_group(std::string(
group));
338 msg->set_allocated_data(sbytes);
340 *key->mutable_cfg() = publisher.
cfg();
342 this->inner().template publish<Base::to_portal_group_>(
msg);
345 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
348 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
349 auto* key =
msg->mutable_key();
351 key->set_marshalling_scheme(
scheme);
352 key->set_type(type_name);
353 key->set_group(std::string(
group));
354 msg->set_data(std::string(bytes.begin(), bytes.end()));
356 this->inner().template publish<Base::to_portal_group_>(
msg);
359 template <
typename Data,
int scheme>
360 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
const Group&
group,
361 const Subscriber<Data>& subscriber)
363 this->inner().template subscribe_dynamic<Data, scheme>(f,
group);
366 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d) {
367 this->inner().template publish_dynamic<Data, scheme>(d,
group);
370 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
371 inner_publication_lambda,
group,
373 [=](
const Data& d) {
return group; }));
375 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
379 template <
typename Data,
int scheme>
380 void _unsubscribe(
const Group&
group,
const Subscriber<Data>& subscriber)
382 this->inner().template unsubscribe_dynamic<Data, scheme>(
group, subscriber);
384 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
385 new SerializationUnSubscription<Data, scheme>(
group));
387 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
391 void _unsubscribe_all()
393 regex_subscriptions_.clear();
394 auto all = std::make_shared<SerializationUnSubscribeAll>();
395 this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
398 std::shared_ptr<SerializationSubscriptionRegex>
399 _subscribe_regex(std::function<
void(
const std::vector<unsigned char>&,
int scheme,
400 const std::string&
type,
const Group&
group)>
402 const std::set<int>& schemes,
const std::string& type_regex =
".*",
403 const std::string& group_regex =
".*")
405 auto inner_publication_lambda = [=](
const std::vector<unsigned char>& data,
int scheme,
406 const std::string&
type,
const Group&
group) {
407 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
409 forwarded_data->mutable_key()->set_marshalling_scheme(
scheme);
410 forwarded_data->mutable_key()->set_type(
type);
411 forwarded_data->mutable_key()->set_group(
group);
412 forwarded_data->set_data(std::string(data.begin(), data.end()));
413 this->inner().template publish<Base::regex_group_>(forwarded_data);
416 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
417 inner_publication_lambda, schemes, type_regex, group_regex);
418 this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
419 portal_subscription);
421 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
422 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
423 regex_subscriptions_.insert(local_subscription);
424 return local_subscription;
427 void _receive_regex_data_forwarded(
428 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
msg)
430 const auto& bytes =
msg->data();
431 for (
auto& sub : regex_subscriptions_)
432 sub->post(bytes.begin(), bytes.end(),
msg->key().marshalling_scheme(),
433 msg->key().type(),
msg->key().group());
436 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
442 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
445 template <
typename Derived,
typename InnerTransporter>
460 this->
inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
461 [
this](std::shared_ptr<const SerializerTransporterMessage> d) {
462 static_cast<Derived*
>(
this)->_receive_publication_forwarded(*d);
465 this->
inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
466 [
this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s) {
467 static_cast<Derived*
>(
this)->_receive_subscription_forwarded(s);
470 this->
inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
471 [
this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s) {
472 static_cast<Derived*
>(
this)->_receive_regex_subscription_forwarded(s);
475 this->
inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
476 [
this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s) {
477 static_cast<Derived*
>(
this)->_unsubscribe_all(s->subscriber_id());