25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
34#include <google/protobuf/io/zero_copy_stream_impl.h>
70template <
typename Derived,
typename InnerTransporter>
74 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
99 << request.ShortDebugString() << std::endl;
101 switch (request.request())
104 omit_publish_metadata_.erase(request.key().type());
107 omit_publish_metadata_.insert(request.key().type());
117 template <
typename Data>
static constexpr int scheme()
119 static_assert(goby::middleware::scheme<typename detail::primitive_type<Data>::type>() ==
121 "Can only use DCCL messages with InterVehicleTransporters");
131 "goby::middleware::Group must have non-zero numeric "
132 "value to publish on the InterVehicle layer");
142 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
147 "Can only use DCCL messages with InterVehicleTransporters");
149 Data data_with_group = data;
150 publisher.set_group(data_with_group,
group);
152 static_cast<Derived*
>(
this)->
template _publish<Data>(data_with_group,
group, publisher);
154 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
156 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
167 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
172 "Can only use DCCL messages with InterVehicleTransporters");
176 std::shared_ptr<Data> data_with_group(data->New());
177 data_with_group->CopyFrom(*data);
179 publisher.set_group(*data_with_group,
group);
181 static_cast<Derived*
>(
this)->
template _publish<Data>(*data_with_group,
group,
185 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
187 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
188 data_with_group,
group, publisher);
199 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
203 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
213 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
218 "Can only use DCCL messages with InterVehicleTransporters");
219 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
220 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
231 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
237 "Can only use DCCL messages with InterVehicleTransporters");
238 static_cast<Derived*
>(
this)->
template _subscribe<Data>(f,
group, subscriber,
248 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
253 "Can only use DCCL messages with InterVehicleTransporters");
254 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
255 std::function<
void(std::shared_ptr<const Data>)>(),
group, subscriber,
260 template <
typename Data>
261 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
266 std::stringstream ss;
267 ss <<
"Error: Publisher must have set_group_func in order to publish to a "
268 "non-broadcast Group ("
270 <<
"). The set_group_func modifies the contents of the outgoing message to store "
271 "the group information.";
279 auto ack_handler = std::make_shared<
283 auto expire_handler =
289 data, ack_handler, expire_handler);
292 if (!omit_publish_metadata_.count(data->key().type()))
293 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
296 goby::glog <<
"Set up publishing for: " << data->ShortDebugString() << std::endl;
301 template <
typename Data>
302 std::shared_ptr<intervehicle::protobuf::Subscription>
314 std::stringstream ss;
315 ss <<
"Error: Subscriber must have group_func in order to subscribe to "
316 "non-broadcast Group ("
318 <<
"). The group_func returns the appropriate Group based on the contents "
319 "of the incoming message.";
326 std::stringstream ss;
327 ss <<
"Error: Broadcast subscriptions cannot have ack_required: true";
331 auto subscription = std::make_shared<
333 func,
group, subscriber);
347 std::stringstream ss;
348 ss <<
"Cannot unsubscribe to DCCL id: " << dccl_id
349 <<
" and group: " << std::string(
group) <<
" as no subscription was found.";
356 auto dccl_subscription =
357 this->
template _serialize_subscription<Data>(
group, subscriber, action);
365 auto subscribe_time = dccl_subscription->time_with_units();
366 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
372 auto expire_handler =
378 << subscription_publication->ShortDebugString()
381 this->pending_ack_.insert(std::make_pair(*subscription_publication,
382 std::make_tuple(ack_handler, expire_handler)));
384 return dccl_subscription;
387 template <
int tuple_index,
typename AckorExpirePair>
390 auto original = ack_or_expire_pair.serializer();
391 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
393 original.key().type() ==
399 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
400 decltype(bytes_begin) actual_end;
404 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
405 subscription->mutable_header()->set_src(0);
407 std::vector<char> bytes(Helper::serialize(*subscription));
408 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
409 original.set_allocated_data(sbytes);
412 auto it = pending_ack_.find(original);
413 if (it != pending_ack_.end())
416 <<
" for: " << original.ShortDebugString() <<
", "
417 << ack_or_expire_msg.ShortDebugString()
420 std::get<tuple_index>(it->second)
421 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
426 << (is_subscription ?
"subscription: " :
"data: ")
427 << original.ShortDebugString() << std::endl;
434 << packets.ShortDebugString() << std::endl;
436 for (
const auto& packet : packets.
frame())
439 p.second->post(packet.data().begin(), packet.data().end(), packets.
header());
443 template <
typename Data>
444 std::shared_ptr<intervehicle::protobuf::Subscription>
449 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
450 dccl_subscription->mutable_header()->set_src(0);
453 dccl_subscription->mutable_header()->add_dest(
id);
456 dccl_subscription->set_dccl_id(dccl_id);
457 dccl_subscription->set_group(
group.numeric());
458 dccl_subscription->set_time_with_units(
459 goby::time::SystemClock::now<goby::time::MicroTime>());
464 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
465 *dccl_subscription->mutable_intervehicle() = subscriber.
cfg().
intervehicle();
466 return dccl_subscription;
470 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
476 << data->ShortDebugString() << std::endl;
478 this->pending_ack_.insert(
479 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
492 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
494 _expire_pending_ack();
496 return static_cast<Derived*
>(
this)->_poll(lock);
502 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
505 template <
typename Data>
506 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta,
const Data& d)
508 meta->set_protobuf_name(
510 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
514 void _insert_file_desc_with_dependencies(
const google::protobuf::FileDescriptor* file_desc,
515 protobuf::SerializerProtobufMetadata* meta)
517 for (
int i = 0, n = file_desc->dependency_count(); i < n; ++i)
518 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
520 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
521 file_desc->CopyTo(file_desc_proto);
525 void _expire_pending_ack()
527 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
528 for (
auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
531 ->FindFieldByName(
"ttl")
533 .GetExtension(dccl::field)
537 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
538 decltype(now) expire_time(serialize_time + max_ttl);
541 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
545 if (now > expire_time + interprocess_wait)
548 << it->first.ShortDebugString() << std::endl;
549 it = pending_ack_.erase(it);
563 protobuf::SerializerTransporterMessage,
564 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
565 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
569 std::set<std::string> omit_publish_metadata_;
576template <
typename InnerTransporter>
598 this->
inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
599 [
this](
const ack_pair_type& ack_pair)
600 { this->
template _handle_ack_or_expire<0>(ack_pair); });
603 this->
inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
604 [
this](
const expire_pair_type& expire_pair)
605 { this->
template _handle_ack_or_expire<1>(expire_pair); });
613 template <
typename Data>
616 this->
inner().template publish<intervehicle::groups::modem_data_out>(
620 template <
typename Data>
621 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
632 catch (
const InvalidUnsubscription& e)
638 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) {
return 0; }
644template <
typename InnerTransporter>
650 using implementation_tag =
typename InnerTransporter::implementation_tag;
652 implementation_tag>::modem_id_type;
675 for (
auto& modem_driver_data : modem_drivers_)
677 modem_driver_data->driver_thread_alive =
false;
678 if (modem_driver_data->underlying_thread)
679 modem_driver_data->underlying_thread->join();
686 template <
typename Data>
689 this->
innermost().template publish<intervehicle::groups::modem_data_out>(
693 template <
typename Data>
694 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
701 this->
innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
704 catch (
const InvalidUnsubscription& e)
710 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
714 while (!received_.empty())
717 received_.pop_front();
730 using intervehicle::protobuf::Subscription;
731 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
735 intervehicle::protobuf::Subscription,
738 auto subscription = std::make_shared<
739 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
742 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
744 std::make_pair(subscription->subscribed_group(), subscription));
747 this->
innermost().template subscribe<intervehicle::groups::modem_data_in>(
748 [
this](
const intervehicle::protobuf::DCCLForwardedData&
msg)
749 { received_.push_back(
msg); });
754 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
755 this->
innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
756 [
this](
const ack_pair_type& ack_pair)
757 { this->
template _handle_ack_or_expire<0>(ack_pair); });
759 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
761 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
762 [
this](
const expire_pair_type& expire_pair)
763 { this->
template _handle_ack_or_expire<1>(expire_pair); });
765 this->
innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
766 [
this](
const bool& ready)
774 _set_up_persistent_subscriptions();
776 for (
int i = 0, n = cfg_.
link_size(); i < n; ++i)
781 link->mutable_mac()->set_modem_id(link->modem_id());
783 modem_drivers_.emplace_back(
new ModemDriverData);
784 ModemDriverData& data = *modem_drivers_.back();
786 data.underlying_thread.reset(
new std::thread(
791 data.modem_driver_thread.reset(
792 new intervehicle::ModemDriverThread<implementation_tag>(*link));
793 data.modem_driver_thread->run(data.driver_thread_alive);
795 catch (std::exception& e)
798 goby::glog <<
"Modem driver thread had uncaught exception: " <<
e.what()
806 std::this_thread::sleep_for(std::chrono::milliseconds(250));
809 while (drivers_ready_ < modem_drivers_.size())
813 std::this_thread::sleep_for(std::chrono::seconds(1));
820 goby::glog <<
"Begin loading subscriptions from persistent storage..." << std::endl;
821 for (
const auto& sub : former_sub_collection_.subscription())
824 intervehicle::protobuf::Subscription,
825 MarshallingScheme::PROTOBUF>(sub);
829 void _set_up_persistent_subscriptions()
836 std::stringstream file_name;
838 if (dir.back() !=
'/')
842 persist_sub_file_name_ = file_name.str();
844 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
847 if (persist_sub_ifs.is_open())
849 google::protobuf::TextFormat::Parser parser;
850 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
851 parser.Parse(&iis, &former_sub_collection_);
856 goby::glog <<
"Could not open persistent subscriptions file: "
857 << persist_sub_file_name_
858 <<
". Assuming no persistent subscriptions exist" << std::endl;
861 catch (
const std::exception& e)
864 goby::glog <<
"Error reading persistent subscriptions file: " <<
e.what()
869 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
870 if (!persist_sub_ofs.is_open())
873 goby::glog <<
"Could not open persistent subscriptions file for writing: "
874 << persist_sub_file_name_ << std::endl;
876 remove(persist_sub_file_name_.c_str());
878 this->
innermost().template subscribe<intervehicle::groups::subscription_report>(
879 [
this](
const intervehicle::protobuf::SubscriptionReport& report)
882 <<
report.ShortDebugString() << std::endl;
884 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
885 intervehicle::protobuf::SubscriptionPersistCollection collection;
886 collection.set_time_with_units(
887 goby::time::SystemClock::now<goby::time::MicroTime>());
888 for (
auto report_p : sub_reports_)
890 for (
const auto& sub : report_p.second.subscription())
891 *collection.add_subscription() = sub;
893 google::protobuf::TextFormat::Printer printer;
894 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
896 goby::glog <<
"Collection: " << collection.ShortDebugString() << std::endl;
897 printer.Print(collection, &oos);
902 intervehicle::protobuf::PortalConfig cfg_;
904 struct ModemDriverData
906 std::unique_ptr<std::thread> underlying_thread;
907 std::unique_ptr<intervehicle::ModemDriverThread<implementation_tag>> modem_driver_thread;
908 std::atomic<bool> driver_thread_alive{
true};
910 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
911 unsigned drivers_ready_{0};
913 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
915 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
916 std::string persist_sub_file_name_;
917 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
simple exception class for goby applications
void set_modem_id(int32_t value)
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
bool ack_required() const
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
InnerTransporter & inner()
Implements the forwarder concept for the intervehicle layer.
virtual ~InterVehicleForwarder()=default
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
typename InnerTransporter::implementation_tag implementation_tag
Implements a portal for the intervehicle layer based on Goby Acomms.
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
virtual ~InterVehicleTransporterBase()=default
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
InterVehicleTransporterBase(InnerTransporter &inner)
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData > > ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData > > expire_handler)
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
InterVehicleTransporterBase()
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Represents a subscription to a serialized data type (intervehicle layer).
InvalidPublication(const std::string &e)
InvalidSubscription(const std::string &e)
InvalidUnsubscription(const std::string &e)
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Represents a callback for a published data type (e.g. acked_func or expired_func)
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
bool has_set_group_func() const
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
subscribed_func_type subscribed_func() const
bool has_group_func() const
const goby::middleware::protobuf::TransporterConfig & cfg() const
subscribe_expired_func_type subscribe_expired_func() const
Provides the modem driver thread used by InterVehiclePortal, templated on ImplementationTag so it use...
const ::goby::middleware::intervehicle::protobuf::Header & header() const
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
::goby::acomms::protobuf::DriverConfig * mutable_driver()
const std::string & name() const
const std::string & dir() const
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
bool has_persist_subscriptions() const
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
int subscription_size() const
static constexpr Action SUBSCRIBE
static constexpr Action UNSUBSCRIBE
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
int32_t publisher_id(int index) const
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
goby::util::logger::GroupSetter group(std::string n)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
constexpr Group subscription_forward
constexpr Group modem_subscription_forward_tx
constexpr Group metadata_request
constexpr Group modem_data_in
constexpr Group modem_subscription_forward_rx
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.
#define GOBY_INTERVEHICLE_API_VERSION