24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
33#include <google/protobuf/io/zero_copy_stream_impl.h>
69template <
typename Derived,
typename InnerTransporter>
73 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
98 << request.ShortDebugString() << std::endl;
100 switch (request.request())
103 omit_publish_metadata_.erase(request.key().type());
106 omit_publish_metadata_.insert(request.key().type());
116 template <
typename Data>
static constexpr int scheme()
118 static_assert(goby::middleware::scheme<typename detail::primitive_type<Data>::type>() ==
120 "Can only use DCCL messages with InterVehicleTransporters");
130 "goby::middleware::Group must have non-zero numeric "
131 "value to publish on the InterVehicle layer");
141 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
146 "Can only use DCCL messages with InterVehicleTransporters");
148 Data data_with_group = data;
149 publisher.set_group(data_with_group,
group);
151 static_cast<Derived*
>(
this)->
template _publish<Data>(data_with_group,
group, publisher);
153 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
155 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
166 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
171 "Can only use DCCL messages with InterVehicleTransporters");
175 std::shared_ptr<Data> data_with_group(data->New());
176 data_with_group->CopyFrom(*data);
178 publisher.set_group(*data_with_group,
group);
180 static_cast<Derived*
>(
this)->
template _publish<Data>(*data_with_group,
group,
184 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
186 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
187 data_with_group,
group, publisher);
198 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
202 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
212 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
217 "Can only use DCCL messages with InterVehicleTransporters");
218 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
219 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
230 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
236 "Can only use DCCL messages with InterVehicleTransporters");
237 static_cast<Derived*
>(
this)->
template _subscribe<Data>(f,
group, subscriber,
247 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
252 "Can only use DCCL messages with InterVehicleTransporters");
253 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
254 std::function<
void(std::shared_ptr<const Data>)>(),
group, subscriber,
259 template <
typename Data>
260 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
265 std::stringstream ss;
266 ss <<
"Error: Publisher must have set_group_func in order to publish to a "
267 "non-broadcast Group ("
269 <<
"). The set_group_func modifies the contents of the outgoing message to store "
270 "the group information.";
278 auto ack_handler = std::make_shared<
282 auto expire_handler =
288 data, ack_handler, expire_handler);
291 if (!omit_publish_metadata_.count(data->key().type()))
292 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
295 goby::glog <<
"Set up publishing for: " << data->ShortDebugString() << std::endl;
300 template <
typename Data>
301 std::shared_ptr<intervehicle::protobuf::Subscription>
313 std::stringstream ss;
314 ss <<
"Error: Subscriber must have group_func in order to subscribe to "
315 "non-broadcast Group ("
317 <<
"). The group_func returns the appropriate Group based on the contents "
318 "of the incoming message.";
325 std::stringstream ss;
326 ss <<
"Error: Broadcast subscriptions cannot have ack_required: true";
330 auto subscription = std::make_shared<
332 func,
group, subscriber);
346 std::stringstream ss;
347 ss <<
"Cannot unsubscribe to DCCL id: " << dccl_id
348 <<
" and group: " << std::string(
group) <<
" as no subscription was found.";
355 auto dccl_subscription =
356 this->
template _serialize_subscription<Data>(
group, subscriber, action);
364 auto subscribe_time = dccl_subscription->time_with_units();
365 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
371 auto expire_handler =
377 << subscription_publication->ShortDebugString()
380 this->pending_ack_.insert(std::make_pair(*subscription_publication,
381 std::make_tuple(ack_handler, expire_handler)));
383 return dccl_subscription;
386 template <
int tuple_index,
typename AckorExpirePair>
389 auto original = ack_or_expire_pair.serializer();
390 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
392 original.key().type() ==
398 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
399 decltype(bytes_begin) actual_end;
403 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
404 subscription->mutable_header()->set_src(0);
406 std::vector<char> bytes(Helper::serialize(*subscription));
407 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
408 original.set_allocated_data(sbytes);
411 auto it = pending_ack_.find(original);
412 if (it != pending_ack_.end())
415 <<
" for: " << original.ShortDebugString() <<
", "
416 << ack_or_expire_msg.ShortDebugString()
419 std::get<tuple_index>(it->second)
420 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
425 << (is_subscription ?
"subscription: " :
"data: ")
426 << original.ShortDebugString() << std::endl;
433 << packets.ShortDebugString() << std::endl;
435 for (
const auto& packet : packets.
frame())
438 p.second->post(packet.data().begin(), packet.data().end(), packets.
header());
442 template <
typename Data>
443 std::shared_ptr<intervehicle::protobuf::Subscription>
448 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
449 dccl_subscription->mutable_header()->set_src(0);
452 dccl_subscription->mutable_header()->add_dest(
id);
455 dccl_subscription->set_dccl_id(dccl_id);
456 dccl_subscription->set_group(
group.numeric());
457 dccl_subscription->set_time_with_units(
458 goby::time::SystemClock::now<goby::time::MicroTime>());
463 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
464 *dccl_subscription->mutable_intervehicle() = subscriber.
cfg().
intervehicle();
465 return dccl_subscription;
469 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
475 << data->ShortDebugString() << std::endl;
477 this->pending_ack_.insert(
478 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
491 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
493 _expire_pending_ack();
495 return static_cast<Derived*
>(
this)->_poll(lock);
501 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
504 template <
typename Data>
505 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta,
const Data& d)
507 meta->set_protobuf_name(
509 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
513 void _insert_file_desc_with_dependencies(
const google::protobuf::FileDescriptor* file_desc,
514 protobuf::SerializerProtobufMetadata* meta)
516 for (
int i = 0, n = file_desc->dependency_count(); i < n; ++i)
517 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
519 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
520 file_desc->CopyTo(file_desc_proto);
524 void _expire_pending_ack()
526 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
527 for (
auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
530 ->FindFieldByName(
"ttl")
532 .GetExtension(dccl::field)
536 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
537 decltype(now) expire_time(serialize_time + max_ttl);
540 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
544 if (now > expire_time + interprocess_wait)
547 << it->first.ShortDebugString() << std::endl;
548 it = pending_ack_.erase(it);
562 protobuf::SerializerTransporterMessage,
563 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
564 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
568 std::set<std::string> omit_publish_metadata_;
575template <
typename InnerTransporter>
595 this->
inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
596 [
this](
const ack_pair_type& ack_pair)
597 { this->
template _handle_ack_or_expire<0>(ack_pair); });
600 this->
inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
601 [
this](
const expire_pair_type& expire_pair)
602 { this->
template _handle_ack_or_expire<1>(expire_pair); });
610 template <
typename Data>
613 this->
inner().template publish<intervehicle::groups::modem_data_out>(
617 template <
typename Data>
618 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
629 catch (
const InvalidUnsubscription& e)
635 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) {
return 0; }
641template <
typename InnerTransporter>
668 for (
auto& modem_driver_data : modem_drivers_)
670 modem_driver_data->driver_thread_alive =
false;
671 if (modem_driver_data->underlying_thread)
672 modem_driver_data->underlying_thread->join();
679 template <
typename Data>
682 this->
innermost().template publish<intervehicle::groups::modem_data_out>(
686 template <
typename Data>
687 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
694 this->
innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
697 catch (
const InvalidUnsubscription& e)
703 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
707 while (!received_.empty())
710 received_.pop_front();
723 using intervehicle::protobuf::Subscription;
724 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
728 intervehicle::protobuf::Subscription,
731 auto subscription = std::make_shared<
732 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
735 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
737 std::make_pair(subscription->subscribed_group(), subscription));
740 this->
innermost().template subscribe<intervehicle::groups::modem_data_in>(
741 [
this](
const intervehicle::protobuf::DCCLForwardedData&
msg)
742 { received_.push_back(
msg); });
747 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
748 this->
innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
749 [
this](
const ack_pair_type& ack_pair)
750 { this->
template _handle_ack_or_expire<0>(ack_pair); });
752 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
754 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
755 [
this](
const expire_pair_type& expire_pair)
756 { this->
template _handle_ack_or_expire<1>(expire_pair); });
758 this->
innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
759 [
this](
const bool& ready)
767 _set_up_persistent_subscriptions();
769 for (
int i = 0, n = cfg_.
link_size(); i < n; ++i)
774 link->mutable_mac()->set_modem_id(link->modem_id());
776 modem_drivers_.emplace_back(
new ModemDriverData);
777 ModemDriverData& data = *modem_drivers_.back();
779 data.underlying_thread.reset(
new std::thread(
784 data.modem_driver_thread.reset(
new intervehicle::ModemDriverThread(*link));
785 data.modem_driver_thread->run(data.driver_thread_alive);
787 catch (std::exception& e)
790 goby::glog <<
"Modem driver thread had uncaught exception: " <<
e.what()
798 std::this_thread::sleep_for(std::chrono::milliseconds(250));
801 while (drivers_ready_ < modem_drivers_.size())
805 std::this_thread::sleep_for(std::chrono::seconds(1));
812 goby::glog <<
"Begin loading subscriptions from persistent storage..." << std::endl;
813 for (
const auto& sub : former_sub_collection_.subscription())
816 intervehicle::protobuf::Subscription,
817 MarshallingScheme::PROTOBUF>(sub);
821 void _set_up_persistent_subscriptions()
828 std::stringstream file_name;
830 if (dir.back() !=
'/')
834 persist_sub_file_name_ = file_name.str();
836 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
839 if (persist_sub_ifs.is_open())
841 google::protobuf::TextFormat::Parser parser;
842 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
843 parser.Parse(&iis, &former_sub_collection_);
848 goby::glog <<
"Could not open persistent subscriptions file: "
849 << persist_sub_file_name_
850 <<
". Assuming no persistent subscriptions exist" << std::endl;
853 catch (
const std::exception& e)
856 goby::glog <<
"Error reading persistent subscriptions file: " <<
e.what()
861 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
862 if (!persist_sub_ofs.is_open())
865 goby::glog <<
"Could not open persistent subscriptions file for writing: "
866 << persist_sub_file_name_ << std::endl;
868 remove(persist_sub_file_name_.c_str());
870 this->
innermost().template subscribe<intervehicle::groups::subscription_report>(
871 [
this](
const intervehicle::protobuf::SubscriptionReport& report)
874 <<
report.ShortDebugString() << std::endl;
876 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
877 intervehicle::protobuf::SubscriptionPersistCollection collection;
878 collection.set_time_with_units(
879 goby::time::SystemClock::now<goby::time::MicroTime>());
880 for (
auto report_p : sub_reports_)
882 for (
const auto& sub : report_p.second.subscription())
883 *collection.add_subscription() = sub;
885 google::protobuf::TextFormat::Printer printer;
886 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
888 goby::glog <<
"Collection: " << collection.ShortDebugString() << std::endl;
889 printer.Print(collection, &oos);
894 intervehicle::protobuf::PortalConfig cfg_;
896 struct ModemDriverData
898 std::unique_ptr<std::thread> underlying_thread;
899 std::unique_ptr<intervehicle::ModemDriverThread> modem_driver_thread;
900 std::atomic<bool> driver_thread_alive{
true};
902 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
903 unsigned drivers_ready_{0};
905 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
907 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
908 std::string persist_sub_file_name_;
909 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
simple exception class for goby applications
void set_modem_id(int32_t value)
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
bool ack_required() const
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
InnerTransporter & inner()
Implements the forwarder concept for the intervehicle layer.
virtual ~InterVehicleForwarder()=default
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
Implements a portal for the intervehicle layer based on Goby Acomms.
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
virtual ~InterVehicleTransporterBase()=default
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
InterVehicleTransporterBase(InnerTransporter &inner)
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData > > ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData > > expire_handler)
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
InterVehicleTransporterBase()
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Represents a subscription to a serialized data type (intervehicle layer).
InvalidPublication(const std::string &e)
InvalidSubscription(const std::string &e)
InvalidUnsubscription(const std::string &e)
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Represents a callback for a published data type (e.g. acked_func or expired_func)
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
bool has_set_group_func() const
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
subscribed_func_type subscribed_func() const
bool has_group_func() const
const goby::middleware::protobuf::TransporterConfig & cfg() const
subscribe_expired_func_type subscribe_expired_func() const
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
const ::goby::middleware::intervehicle::protobuf::Header & header() const
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
::goby::acomms::protobuf::DriverConfig * mutable_driver()
const std::string & name() const
const std::string & dir() const
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
bool has_persist_subscriptions() const
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
int subscription_size() const
static constexpr Action SUBSCRIBE
static constexpr Action UNSUBSCRIBE
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
int32_t publisher_id(int index) const
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
goby::util::logger::GroupSetter group(std::string n)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
constexpr Group subscription_forward
constexpr Group modem_subscription_forward_tx
constexpr Group metadata_request
constexpr Group modem_data_in
constexpr Group modem_subscription_forward_rx
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.
#define GOBY_INTERVEHICLE_API_VERSION