24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
33#include <google/protobuf/io/zero_copy_stream_impl.h>
69template <
typename Derived,
typename InnerTransporter>
73 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
98 << request.ShortDebugString() << std::endl;
100 switch (request.request())
103 omit_publish_metadata_.erase(request.key().type());
106 omit_publish_metadata_.insert(request.key().type());
116 template <
typename Data>
static constexpr int scheme()
118 static_assert(goby::middleware::scheme<typename detail::primitive_type<Data>::type>() ==
120 "Can only use DCCL messages with InterVehicleTransporters");
130 "goby::middleware::Group must have non-zero numeric "
131 "value to publish on the InterVehicle layer");
141 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
146 "Can only use DCCL messages with InterVehicleTransporters");
148 Data data_with_group = data;
149 publisher.set_group(data_with_group,
group);
151 static_cast<Derived*
>(
this)->
template _publish<Data>(data_with_group,
group, publisher);
153 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
155 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
166 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
171 "Can only use DCCL messages with InterVehicleTransporters");
175 std::shared_ptr<Data> data_with_group(data->New());
176 data_with_group->CopyFrom(*data);
178 publisher.set_group(*data_with_group,
group);
180 static_cast<Derived*
>(
this)->
template _publish<Data>(*data_with_group,
group,
184 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
186 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
187 data_with_group,
group, publisher);
198 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
202 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
212 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
217 "Can only use DCCL messages with InterVehicleTransporters");
218 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
219 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
230 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
236 "Can only use DCCL messages with InterVehicleTransporters");
237 static_cast<Derived*
>(
this)->
template _subscribe<Data>(f,
group, subscriber,
247 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
252 "Can only use DCCL messages with InterVehicleTransporters");
253 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
254 std::function<
void(std::shared_ptr<const Data>)>(),
group, subscriber,
259 template <
typename Data>
260 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
265 std::stringstream ss;
266 ss <<
"Error: Publisher must have set_group_func in order to publish to a "
267 "non-broadcast Group ("
269 <<
"). The set_group_func modifies the contents of the outgoing message to store "
270 "the group information.";
278 auto ack_handler = std::make_shared<
282 auto expire_handler =
288 data, ack_handler, expire_handler);
291 if (!omit_publish_metadata_.count(data->key().type()))
292 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
295 goby::glog <<
"Set up publishing for: " << data->ShortDebugString() << std::endl;
300 template <
typename Data>
301 std::shared_ptr<intervehicle::protobuf::Subscription>
313 std::stringstream ss;
314 ss <<
"Error: Subscriber must have group_func in order to subscribe to "
315 "non-broadcast Group ("
317 <<
"). The group_func returns the appropriate Group based on the contents "
318 "of the incoming message.";
325 std::stringstream ss;
326 ss <<
"Error: Broadcast subscriptions cannot have ack_required: true";
330 auto subscription = std::make_shared<
332 func,
group, subscriber);
346 std::stringstream ss;
347 ss <<
"Cannot unsubscribe to DCCL id: " << dccl_id
348 <<
" and group: " << std::string(
group) <<
" as no subscription was found.";
355 auto dccl_subscription =
356 this->
template _serialize_subscription<Data>(
group, subscriber, action);
364 auto subscribe_time = dccl_subscription->time_with_units();
365 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
371 auto expire_handler =
377 << subscription_publication->ShortDebugString()
380 this->pending_ack_.insert(std::make_pair(*subscription_publication,
381 std::make_tuple(ack_handler, expire_handler)));
383 return dccl_subscription;
386 template <
int tuple_index,
typename AckorExpirePair>
389 auto original = ack_or_expire_pair.serializer();
390 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
392 original.key().type() ==
398 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
399 decltype(bytes_begin) actual_end;
403 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
404 subscription->mutable_header()->set_src(0);
406 std::vector<char> bytes(Helper::serialize(*subscription));
407 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
408 original.set_allocated_data(sbytes);
411 auto it = pending_ack_.find(original);
412 if (it != pending_ack_.end())
415 <<
" for: " << original.ShortDebugString() <<
", "
416 << ack_or_expire_msg.ShortDebugString()
419 std::get<tuple_index>(it->second)
420 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
425 << (is_subscription ?
"subscription: " :
"data: ")
426 << original.ShortDebugString() << std::endl;
433 << packets.ShortDebugString() << std::endl;
435 for (
const auto& packet : packets.
frame())
438 p.second->post(packet.data().begin(), packet.data().end(), packets.
header());
442 template <
typename Data>
443 std::shared_ptr<intervehicle::protobuf::Subscription>
448 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
449 dccl_subscription->mutable_header()->set_src(0);
452 dccl_subscription->mutable_header()->add_dest(
id);
455 dccl_subscription->set_dccl_id(dccl_id);
456 dccl_subscription->set_group(
group.numeric());
457 dccl_subscription->set_time_with_units(
458 goby::time::SystemClock::now<goby::time::MicroTime>());
463 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
464 *dccl_subscription->mutable_intervehicle() = subscriber.
cfg().
intervehicle();
465 return dccl_subscription;
469 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
475 << data->ShortDebugString() << std::endl;
477 this->pending_ack_.insert(
478 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
491 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
493 _expire_pending_ack();
495 return static_cast<Derived*
>(
this)->_poll(lock);
501 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
504 template <
typename Data>
505 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta,
const Data& d)
507 meta->set_protobuf_name(
509 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
513 void _insert_file_desc_with_dependencies(
const google::protobuf::FileDescriptor* file_desc,
514 protobuf::SerializerProtobufMetadata* meta)
516 for (
int i = 0, n = file_desc->dependency_count(); i < n; ++i)
517 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
519 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
520 file_desc->CopyTo(file_desc_proto);
524 void _expire_pending_ack()
526 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
527 for (
auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
530 ->FindFieldByName(
"ttl")
532 .GetExtension(dccl::field)
536 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
537 decltype(now) expire_time(serialize_time + max_ttl);
540 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
544 if (now > expire_time + interprocess_wait)
547 << it->first.ShortDebugString() << std::endl;
548 it = pending_ack_.erase(it);
562 protobuf::SerializerTransporterMessage,
563 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
564 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
568 std::set<std::string> omit_publish_metadata_;
575template <
typename InnerTransporter>
597 this->
inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
598 [
this](
const ack_pair_type& ack_pair)
599 { this->
template _handle_ack_or_expire<0>(ack_pair); });
602 this->
inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
603 [
this](
const expire_pair_type& expire_pair)
604 { this->
template _handle_ack_or_expire<1>(expire_pair); });
612 template <
typename Data>
615 this->
inner().template publish<intervehicle::groups::modem_data_out>(
619 template <
typename Data>
620 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
631 catch (
const InvalidUnsubscription& e)
637 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) {
return 0; }
643template <
typename InnerTransporter>
649 using implementation_tag =
typename InnerTransporter::implementation_tag;
651 implementation_tag>::modem_id_type;
674 for (
auto& modem_driver_data : modem_drivers_)
676 modem_driver_data->driver_thread_alive =
false;
677 if (modem_driver_data->underlying_thread)
678 modem_driver_data->underlying_thread->join();
685 template <
typename Data>
688 this->
innermost().template publish<intervehicle::groups::modem_data_out>(
692 template <
typename Data>
693 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
700 this->
innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
703 catch (
const InvalidUnsubscription& e)
709 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
713 while (!received_.empty())
716 received_.pop_front();
729 using intervehicle::protobuf::Subscription;
730 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
734 intervehicle::protobuf::Subscription,
737 auto subscription = std::make_shared<
738 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
741 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
743 std::make_pair(subscription->subscribed_group(), subscription));
746 this->
innermost().template subscribe<intervehicle::groups::modem_data_in>(
747 [
this](
const intervehicle::protobuf::DCCLForwardedData&
msg)
748 { received_.push_back(
msg); });
753 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
754 this->
innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
755 [
this](
const ack_pair_type& ack_pair)
756 { this->
template _handle_ack_or_expire<0>(ack_pair); });
758 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
760 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
761 [
this](
const expire_pair_type& expire_pair)
762 { this->
template _handle_ack_or_expire<1>(expire_pair); });
764 this->
innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
765 [
this](
const bool& ready)
773 _set_up_persistent_subscriptions();
775 for (
int i = 0, n = cfg_.
link_size(); i < n; ++i)
780 link->mutable_mac()->set_modem_id(link->modem_id());
782 modem_drivers_.emplace_back(
new ModemDriverData);
783 ModemDriverData& data = *modem_drivers_.back();
785 data.underlying_thread.reset(
new std::thread(
790 data.modem_driver_thread.reset(
791 new intervehicle::ModemDriverThread<implementation_tag>(*link));
792 data.modem_driver_thread->run(data.driver_thread_alive);
794 catch (std::exception& e)
797 goby::glog <<
"Modem driver thread had uncaught exception: " <<
e.what()
805 std::this_thread::sleep_for(std::chrono::milliseconds(250));
808 while (drivers_ready_ < modem_drivers_.size())
812 std::this_thread::sleep_for(std::chrono::seconds(1));
819 goby::glog <<
"Begin loading subscriptions from persistent storage..." << std::endl;
820 for (
const auto& sub : former_sub_collection_.subscription())
823 intervehicle::protobuf::Subscription,
824 MarshallingScheme::PROTOBUF>(sub);
828 void _set_up_persistent_subscriptions()
835 std::stringstream file_name;
837 if (dir.back() !=
'/')
841 persist_sub_file_name_ = file_name.str();
843 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
846 if (persist_sub_ifs.is_open())
848 google::protobuf::TextFormat::Parser parser;
849 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
850 parser.Parse(&iis, &former_sub_collection_);
855 goby::glog <<
"Could not open persistent subscriptions file: "
856 << persist_sub_file_name_
857 <<
". Assuming no persistent subscriptions exist" << std::endl;
860 catch (
const std::exception& e)
863 goby::glog <<
"Error reading persistent subscriptions file: " <<
e.what()
868 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
869 if (!persist_sub_ofs.is_open())
872 goby::glog <<
"Could not open persistent subscriptions file for writing: "
873 << persist_sub_file_name_ << std::endl;
875 remove(persist_sub_file_name_.c_str());
877 this->
innermost().template subscribe<intervehicle::groups::subscription_report>(
878 [
this](
const intervehicle::protobuf::SubscriptionReport& report)
881 <<
report.ShortDebugString() << std::endl;
883 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
884 intervehicle::protobuf::SubscriptionPersistCollection collection;
885 collection.set_time_with_units(
886 goby::time::SystemClock::now<goby::time::MicroTime>());
887 for (
auto report_p : sub_reports_)
889 for (
const auto& sub : report_p.second.subscription())
890 *collection.add_subscription() = sub;
892 google::protobuf::TextFormat::Printer printer;
893 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
895 goby::glog <<
"Collection: " << collection.ShortDebugString() << std::endl;
896 printer.Print(collection, &oos);
901 intervehicle::protobuf::PortalConfig cfg_;
903 struct ModemDriverData
905 std::unique_ptr<std::thread> underlying_thread;
906 std::unique_ptr<intervehicle::ModemDriverThread<implementation_tag>> modem_driver_thread;
907 std::atomic<bool> driver_thread_alive{
true};
909 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
910 unsigned drivers_ready_{0};
912 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
914 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
915 std::string persist_sub_file_name_;
916 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
simple exception class for goby applications
void set_modem_id(int32_t value)
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
bool ack_required() const
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
InnerTransporter & inner()
Implements the forwarder concept for the intervehicle layer.
virtual ~InterVehicleForwarder()=default
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
typename InnerTransporter::implementation_tag implementation_tag
Implements a portal for the intervehicle layer based on Goby Acomms.
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
virtual ~InterVehicleTransporterBase()=default
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
InterVehicleTransporterBase(InnerTransporter &inner)
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData > > ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData > > expire_handler)
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
InterVehicleTransporterBase()
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Represents a subscription to a serialized data type (intervehicle layer).
InvalidPublication(const std::string &e)
InvalidSubscription(const std::string &e)
InvalidUnsubscription(const std::string &e)
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Represents a callback for a published data type (e.g. acked_func or expired_func)
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
bool has_set_group_func() const
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
subscribed_func_type subscribed_func() const
bool has_group_func() const
const goby::middleware::protobuf::TransporterConfig & cfg() const
subscribe_expired_func_type subscribe_expired_func() const
Provides the modem driver thread used by InterVehiclePortal, templated on ImplementationTag so it use...
const ::goby::middleware::intervehicle::protobuf::Header & header() const
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
::goby::acomms::protobuf::DriverConfig * mutable_driver()
const std::string & name() const
const std::string & dir() const
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
bool has_persist_subscriptions() const
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
int subscription_size() const
static constexpr Action SUBSCRIBE
static constexpr Action UNSUBSCRIBE
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
int32_t publisher_id(int index) const
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
goby::util::logger::GroupSetter group(std::string n)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
constexpr Group subscription_forward
constexpr Group modem_subscription_forward_tx
constexpr Group metadata_request
constexpr Group modem_data_in
constexpr Group modem_subscription_forward_rx
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.
#define GOBY_INTERVEHICLE_API_VERSION