Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
intervehicle.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
26#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
27
28#include <atomic>
29#include <functional>
30#include <sys/types.h>
31#include <thread>
32#include <unistd.h>
33
34#include <google/protobuf/io/zero_copy_stream_impl.h>
35
37
39#include "goby/middleware/transport/interthread.h" // used for InterVehiclePortal implementation
43
44namespace goby
45{
46namespace middleware
47{
49{
50 public:
51 InvalidSubscription(const std::string& e) : Exception(e) {}
52};
53
55{
56 public:
57 InvalidPublication(const std::string& e) : Exception(e) {}
58};
59
61{
62 public:
63 InvalidUnsubscription(const std::string& e) : Exception(e) {}
64};
65
70template <typename Derived, typename InnerTransporter>
72 : public StaticTransporterInterface<InterVehicleTransporterBase<Derived, InnerTransporter>,
73 InnerTransporter>,
74 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
75{
76 using InterfaceType =
78 InnerTransporter>;
79
81
82 public:
84 {
87 };
88
91 {
92 // handle request from Portal to omit or include metadata on future publications for a given data type
93 this->inner()
96 [this](const protobuf::SerializerMetadataRequest& request)
97 {
98 glog.is_debug3() && glog << "Received DCCL metadata request: "
99 << request.ShortDebugString() << std::endl;
100
101 switch (request.request())
102 {
104 omit_publish_metadata_.erase(request.key().type());
105 break;
107 omit_publish_metadata_.insert(request.key().type());
108 break;
109 }
110 });
111 }
113
114 virtual ~InterVehicleTransporterBase() = default;
115
117 template <typename Data> static constexpr int scheme()
118 {
119 static_assert(goby::middleware::scheme<typename detail::primitive_type<Data>::type>() ==
121 "Can only use DCCL messages with InterVehicleTransporters");
123 }
124
128 template <const Group& group> void check_validity()
129 {
130 static_assert(group.numeric() != Group::invalid_numeric_group,
131 "goby::middleware::Group must have non-zero numeric "
132 "value to publish on the InterVehicle layer");
133 }
134
142 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
143 void publish_dynamic(const Data& data, const Group& group = Group(),
144 const Publisher<Data>& publisher = Publisher<Data>())
145 {
146 static_assert(scheme == MarshallingScheme::DCCL,
147 "Can only use DCCL messages with InterVehicleTransporters");
148
149 Data data_with_group = data;
150 publisher.set_group(data_with_group, group);
151
152 static_cast<Derived*>(this)->template _publish<Data>(data_with_group, group, publisher);
153 // publish to interprocess as both DCCL and Protobuf
154 this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
155 group, publisher);
156 this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
157 group, publisher);
158 }
159
167 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
168 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group = Group(),
169 const Publisher<Data>& publisher = Publisher<Data>())
170 {
171 static_assert(scheme == MarshallingScheme::DCCL,
172 "Can only use DCCL messages with InterVehicleTransporters");
173 if (data)
174 {
175 // copy this way as it allows us to copy Data == google::protobuf::Message abstract base class
176 std::shared_ptr<Data> data_with_group(data->New());
177 data_with_group->CopyFrom(*data);
178
179 publisher.set_group(*data_with_group, group);
180
181 static_cast<Derived*>(this)->template _publish<Data>(*data_with_group, group,
182 publisher);
183
184 // publish to interprocess as both DCCL and Protobuf
185 this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
186 group, publisher);
187 this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
188 data_with_group, group, publisher);
189 }
190 }
191
199 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
200 void publish_dynamic(std::shared_ptr<Data> data, const Group& group = Group(),
201 const Publisher<Data>& publisher = Publisher<Data>())
202 {
203 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
204 }
205
213 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
214 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group = Group(),
215 const Subscriber<Data>& subscriber = Subscriber<Data>())
216 {
217 static_assert(scheme == MarshallingScheme::DCCL,
218 "Can only use DCCL messages with InterVehicleTransporters");
219 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
220 static_cast<Derived*>(this)->template _subscribe<Data>(
221 pointer_ref_lambda, group, subscriber, SubscriptionAction::SUBSCRIBE);
222 }
223
231 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
232 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f,
233 const Group& group = Group(),
234 const Subscriber<Data>& subscriber = Subscriber<Data>())
235 {
236 static_assert(scheme == MarshallingScheme::DCCL,
237 "Can only use DCCL messages with InterVehicleTransporters");
238 static_cast<Derived*>(this)->template _subscribe<Data>(f, group, subscriber,
240 }
241
248 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
250 const Subscriber<Data>& subscriber = Subscriber<Data>())
251 {
252 static_assert(scheme == MarshallingScheme::DCCL,
253 "Can only use DCCL messages with InterVehicleTransporters");
254 static_cast<Derived*>(this)->template _subscribe<Data>(
255 std::function<void(std::shared_ptr<const Data>)>(), group, subscriber,
257 }
258
259 protected:
260 template <typename Data>
261 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
262 _set_up_publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
263 {
264 if (group.numeric() != Group::broadcast_group && !publisher.has_set_group_func())
265 {
266 std::stringstream ss;
267 ss << "Error: Publisher must have set_group_func in order to publish to a "
268 "non-broadcast Group ("
269 << group
270 << "). The set_group_func modifies the contents of the outgoing message to store "
271 "the group information.";
272 throw(InvalidPublication(ss.str()));
273 }
274
275 auto data = intervehicle::serialize_publication(d, group, publisher);
276
277 if (publisher.cfg().intervehicle().buffer().ack_required())
278 {
279 auto ack_handler = std::make_shared<
281 publisher.acked_func(), d);
282
283 auto expire_handler =
284 std::make_shared<PublisherCallback<Data, MarshallingScheme::DCCL,
286 publisher.expired_func(), d);
287
289 data, ack_handler, expire_handler);
290 }
291
292 if (!omit_publish_metadata_.count(data->key().type()))
293 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
294
296 goby::glog << "Set up publishing for: " << data->ShortDebugString() << std::endl;
297
298 return data;
299 }
300
301 template <typename Data>
302 std::shared_ptr<intervehicle::protobuf::Subscription>
303 _set_up_subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
304 const Subscriber<Data>& subscriber, SubscriptionAction action)
305 {
307
308 switch (action)
309 {
311 {
312 if (group.numeric() != Group::broadcast_group && !subscriber.has_group_func())
313 {
314 std::stringstream ss;
315 ss << "Error: Subscriber must have group_func in order to subscribe to "
316 "non-broadcast Group ("
317 << group
318 << "). The group_func returns the appropriate Group based on the contents "
319 "of the incoming message.";
320 throw(InvalidSubscription(ss.str()));
321 }
322
323 if (subscriber.cfg().intervehicle().broadcast() &&
324 subscriber.cfg().intervehicle().buffer().ack_required())
325 {
326 std::stringstream ss;
327 ss << "Error: Broadcast subscriptions cannot have ack_required: true";
328 throw(InvalidSubscription(ss.str()));
329 }
330
331 auto subscription = std::make_shared<
333 func, group, subscriber);
334
335 this->subscriptions_[dccl_id][group] = subscription;
336 }
337 break;
339 {
340 auto sub_it = this->subscriptions_[dccl_id].find(group);
341 if (sub_it != this->subscriptions_[dccl_id].end())
342 {
343 this->subscriptions_[dccl_id].erase(sub_it);
344 }
345 else
346 {
347 std::stringstream ss;
348 ss << "Cannot unsubscribe to DCCL id: " << dccl_id
349 << " and group: " << std::string(group) << " as no subscription was found.";
350 throw(InvalidUnsubscription(ss.str()));
351 }
352 }
353 break;
354 }
355
356 auto dccl_subscription =
357 this->template _serialize_subscription<Data>(group, subscriber, action);
359 // insert pending subscription
360 auto subscription_publication = intervehicle::serialize_publication(
363
364 // overwrite timestamps to ensure mapping with driver threads
365 auto subscribe_time = dccl_subscription->time_with_units();
366 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
367
368 auto ack_handler = std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
370 subscriber.subscribed_func());
371
372 auto expire_handler =
373 std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
375 subscriber.subscribe_expired_func());
376
377 goby::glog.is_debug1() && goby::glog << "Inserting subscription ack handler for "
378 << subscription_publication->ShortDebugString()
379 << std::endl;
380
381 this->pending_ack_.insert(std::make_pair(*subscription_publication,
382 std::make_tuple(ack_handler, expire_handler)));
383
384 return dccl_subscription;
385 }
386
387 template <int tuple_index, typename AckorExpirePair>
388 void _handle_ack_or_expire(const AckorExpirePair& ack_or_expire_pair)
389 {
390 auto original = ack_or_expire_pair.serializer();
391 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
392 bool is_subscription = original.key().marshalling_scheme() == MarshallingScheme::DCCL &&
393 original.key().type() ==
395
396 if (is_subscription)
397 {
398 // rewrite data to remove src()
399 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
400 decltype(bytes_begin) actual_end;
401
404 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
405 subscription->mutable_header()->set_src(0);
406
407 std::vector<char> bytes(Helper::serialize(*subscription));
408 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
409 original.set_allocated_data(sbytes);
410 }
411
412 auto it = pending_ack_.find(original);
413 if (it != pending_ack_.end())
414 {
415 goby::glog.is_debug3() && goby::glog << ack_or_expire_msg.GetDescriptor()->name()
416 << " for: " << original.ShortDebugString() << ", "
417 << ack_or_expire_msg.ShortDebugString()
418 << std::endl;
419
420 std::get<tuple_index>(it->second)
421 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
422 }
423 else
424 {
425 goby::glog.is_debug3() && goby::glog << "No pending Ack/Expire for "
426 << (is_subscription ? "subscription: " : "data: ")
427 << original.ShortDebugString() << std::endl;
428 }
429 }
430
432 {
433 goby::glog.is_debug3() && goby::glog << "Received DCCLForwarded data: "
434 << packets.ShortDebugString() << std::endl;
435
436 for (const auto& packet : packets.frame())
437 {
438 for (auto p : this->subscriptions_[packet.dccl_id()])
439 p.second->post(packet.data().begin(), packet.data().end(), packets.header());
440 }
441 }
442
443 template <typename Data>
444 std::shared_ptr<intervehicle::protobuf::Subscription>
446 SubscriptionAction action)
447 {
449 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
450 dccl_subscription->mutable_header()->set_src(0);
451
452 for (auto id : subscriber.cfg().intervehicle().publisher_id())
453 dccl_subscription->mutable_header()->add_dest(id);
454
455 dccl_subscription->set_api_version(GOBY_INTERVEHICLE_API_VERSION);
456 dccl_subscription->set_dccl_id(dccl_id);
457 dccl_subscription->set_group(group.numeric());
458 dccl_subscription->set_time_with_units(
459 goby::time::SystemClock::now<goby::time::MicroTime>());
460 dccl_subscription->set_action((action == SubscriptionAction::SUBSCRIBE)
463
464 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
465 *dccl_subscription->mutable_intervehicle() = subscriber.cfg().intervehicle();
466 return dccl_subscription;
467 }
468
470 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
473 expire_handler)
474 {
475 goby::glog.is_debug3() && goby::glog << "Inserting ack handler for "
476 << data->ShortDebugString() << std::endl;
477
478 this->pending_ack_.insert(
479 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
480 }
481
482 protected:
483 // maps DCCL ID to map of Group->subscription
484 // only one subscription allowed per IntervehicleForwarder/Portal (new subscription overwrites old one)
485 std::unordered_map<
486 int, std::unordered_map<std::string, std::shared_ptr<const SerializationHandlerBase<
489
490 private:
491 friend PollerType;
492 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
493 {
494 _expire_pending_ack();
495
496 return static_cast<Derived*>(this)->_poll(lock);
497 }
498
499 template <typename Data> void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta)
500 {
502 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
503 }
504
505 template <typename Data>
506 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta, const Data& d)
507 {
508 meta->set_protobuf_name(
510 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
511 }
512
513 // used to populated InterVehicleSubscription file_descriptor fields
514 void _insert_file_desc_with_dependencies(const google::protobuf::FileDescriptor* file_desc,
515 protobuf::SerializerProtobufMetadata* meta)
516 {
517 for (int i = 0, n = file_desc->dependency_count(); i < n; ++i)
518 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
519
520 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
521 file_desc->CopyTo(file_desc_proto);
522 }
523
524 // expire any pending_ack entries that are no longer relevant
525 void _expire_pending_ack()
526 {
527 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
528 for (auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
529 {
531 ->FindFieldByName("ttl")
532 ->options()
533 .GetExtension(dccl::field)
534 .max() *
536
537 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
538 decltype(now) expire_time(serialize_time + max_ttl);
539
540 // time to let any expire messages from the drivers propagate through the interprocess layer before we remove this
541 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
542
543 // loop through pending ack, and clear any at the front that can be removed
544
545 if (now > expire_time + interprocess_wait)
546 {
547 goby::glog.is_debug3() && goby::glog << "Erasing pending ack for "
548 << it->first.ShortDebugString() << std::endl;
549 it = pending_ack_.erase(it);
550 }
551 else
552 {
553 // pending_ack_ is ordered by serialize time, so we can bail now
554 break;
555 }
556 }
557 }
558
559 private:
560 // maps data with ack_requested onto callbacks for when the data are acknowledged or expire
561 // ordered by serialize time
562 std::map<
563 protobuf::SerializerTransporterMessage,
564 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
565 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
566 pending_ack_;
567
568 // map of Protobuf names where we can omit metadata on publication
569 std::set<std::string> omit_publish_metadata_;
570};
571
576template <typename InnerTransporter>
578 : public InterVehicleTransporterBase<InterVehicleForwarder<InnerTransporter>, InnerTransporter>
579{
580 public:
581 using implementation_tag = typename InnerTransporter::implementation_tag;
582
583 using Base =
585
589 InterVehicleForwarder(InnerTransporter& inner) : Base(inner)
590 {
591 this->inner()
595 { this->_receive(msg); });
596
597 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
598 this->inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
599 [this](const ack_pair_type& ack_pair)
600 { this->template _handle_ack_or_expire<0>(ack_pair); });
601
602 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
603 this->inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
604 [this](const expire_pair_type& expire_pair)
605 { this->template _handle_ack_or_expire<1>(expire_pair); });
606 }
607
608 virtual ~InterVehicleForwarder() = default;
609
610 friend Base;
611
612 private:
613 template <typename Data>
614 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
615 {
616 this->inner().template publish<intervehicle::groups::modem_data_out>(
617 this->_set_up_publish(d, group, publisher));
618 }
619
620 template <typename Data>
621 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
622 const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
623 {
624 try
625 {
626 this->inner()
630 this->_set_up_subscribe(func, group, subscriber, action));
631 }
632 catch (const InvalidUnsubscription& e)
633 {
634 goby::glog.is_warn() && goby::glog << e.what() << std::endl;
635 }
636 }
637
638 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) { return 0; }
639};
640
644template <typename InnerTransporter>
646 : public InterVehicleTransporterBase<InterVehiclePortal<InnerTransporter>, InnerTransporter>
647{
648 // Derive the ImplementationTag from InnerTransporter so that the modem driver thread
649 // uses the same InterProcessForwarder prefix as the portal's inner transporter.
650 using implementation_tag = typename InnerTransporter::implementation_tag;
651 using modem_id_type = typename goby::middleware::intervehicle::ModemDriverThread<
652 implementation_tag>::modem_id_type;
653
654 public:
655 using Base =
657
661 InterVehiclePortal(const intervehicle::protobuf::PortalConfig& cfg) : cfg_(cfg) { _init(); }
662
668 : Base(inner), cfg_(cfg)
669 {
670 _init();
671 }
672
674 {
675 for (auto& modem_driver_data : modem_drivers_)
676 {
677 modem_driver_data->driver_thread_alive = false;
678 if (modem_driver_data->underlying_thread)
679 modem_driver_data->underlying_thread->join();
680 }
681 }
682
683 friend Base;
684
685 private:
686 template <typename Data>
687 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
688 {
689 this->innermost().template publish<intervehicle::groups::modem_data_out>(
690 this->_set_up_publish(d, group, publisher));
691 }
692
693 template <typename Data>
694 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
695 const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
696 {
697 try
698 {
699 auto dccl_subscription = this->_set_up_subscribe(func, group, subscriber, action);
700
701 this->innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
702 dccl_subscription);
703 }
704 catch (const InvalidUnsubscription& e)
705 {
706 goby::glog.is_warn() && goby::glog << e.what() << std::endl;
707 }
708 }
709
710 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
711 {
712 int items = 0;
714 while (!received_.empty())
715 {
716 this->_receive(received_.front());
717 received_.pop_front();
718 ++items;
719 if (lock)
720 lock.reset();
721 }
722 return items;
723 }
724
725 void _init()
726 {
727 // set up reception of forwarded (via acoustic) subscriptions,
728 // then re-publish to driver threads
729 {
730 using intervehicle::protobuf::Subscription;
731 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
732 {
733 this->innermost()
735 intervehicle::protobuf::Subscription,
737 };
738 auto subscription = std::make_shared<
739 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
740 subscribe_lambda);
741
742 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
743 this->subscriptions_[dccl_id].insert(
744 std::make_pair(subscription->subscribed_group(), subscription));
745 }
746
747 this->innermost().template subscribe<intervehicle::groups::modem_data_in>(
748 [this](const intervehicle::protobuf::DCCLForwardedData& msg)
749 { received_.push_back(msg); });
750
751 // a message requiring ack can be disposed by either [1] ack, [2] expire (TTL exceeded), [3] having no subscribers, [4] queue size exceeded.
752 // post the correct callback (ack for [1] and expire for [2-4])
753 // and remove the pending ack message
754 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
755 this->innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
756 [this](const ack_pair_type& ack_pair)
757 { this->template _handle_ack_or_expire<0>(ack_pair); });
758
759 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
760 this->innermost()
761 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
762 [this](const expire_pair_type& expire_pair)
763 { this->template _handle_ack_or_expire<1>(expire_pair); });
764
765 this->innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
766 [this](const bool& ready)
767 {
768 goby::glog.is_debug1() && goby::glog << "Received driver ready" << std::endl;
769 ++drivers_ready_;
770 });
771
772 // set up before drivers ready to ensure we don't miss subscriptions
773 if (cfg_.has_persist_subscriptions())
774 _set_up_persistent_subscriptions();
775
776 for (int i = 0, n = cfg_.link_size(); i < n; ++i)
777 {
778 auto* link = cfg_.mutable_link(i);
779
780 link->mutable_driver()->set_modem_id(link->modem_id());
781 link->mutable_mac()->set_modem_id(link->modem_id());
782
783 modem_drivers_.emplace_back(new ModemDriverData);
784 ModemDriverData& data = *modem_drivers_.back();
785
786 data.underlying_thread.reset(new std::thread(
787 [&data, link]()
788 {
789 try
790 {
791 data.modem_driver_thread.reset(
792 new intervehicle::ModemDriverThread<implementation_tag>(*link));
793 data.modem_driver_thread->run(data.driver_thread_alive);
794 }
795 catch (std::exception& e)
796 {
798 goby::glog << "Modem driver thread had uncaught exception: " << e.what()
799 << std::endl;
800 throw;
801 }
802 }));
803
804 if (goby::glog.buf().is_gui())
805 // allows for visual grouping of each link in the NCurses gui
806 std::this_thread::sleep_for(std::chrono::milliseconds(250));
807 }
808
809 while (drivers_ready_ < modem_drivers_.size())
810 {
811 goby::glog.is_debug1() && goby::glog << "Waiting for drivers to be ready." << std::endl;
812 this->poll();
813 std::this_thread::sleep_for(std::chrono::seconds(1));
814 }
815
816 // write subscriptions after drivers ready to ensure they aren't missed
817 if (former_sub_collection_.subscription_size() > 0)
818 {
820 goby::glog << "Begin loading subscriptions from persistent storage..." << std::endl;
821 for (const auto& sub : former_sub_collection_.subscription())
822 this->innermost()
823 .template publish<intervehicle::groups::modem_subscription_forward_rx,
824 intervehicle::protobuf::Subscription,
825 MarshallingScheme::PROTOBUF>(sub);
826 }
827 }
828
829 void _set_up_persistent_subscriptions()
830 {
831 const auto& dir = cfg_.persist_subscriptions().dir();
832 if (dir.empty())
833 goby::glog.is_die() && goby::glog << "persist_subscriptions.dir cannot be empty"
834 << std::endl;
835
836 std::stringstream file_name;
837 file_name << dir;
838 if (dir.back() != '/')
839 file_name << "/";
840 file_name << "goby_intervehicle_subscriptions_" << cfg_.persist_subscriptions().name()
841 << ".pb.txt";
842 persist_sub_file_name_ = file_name.str();
843 {
844 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
845 try
846 {
847 if (persist_sub_ifs.is_open())
848 {
849 google::protobuf::TextFormat::Parser parser;
850 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
851 parser.Parse(&iis, &former_sub_collection_);
852 }
853 else
854 {
856 goby::glog << "Could not open persistent subscriptions file: "
857 << persist_sub_file_name_
858 << ". Assuming no persistent subscriptions exist" << std::endl;
859 }
860 }
861 catch (const std::exception& e)
862 {
864 goby::glog << "Error reading persistent subscriptions file: " << e.what()
865 << std::endl;
866 }
867 }
868
869 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
870 if (!persist_sub_ofs.is_open())
871 {
872 goby::glog.is_die() &&
873 goby::glog << "Could not open persistent subscriptions file for writing: "
874 << persist_sub_file_name_ << std::endl;
875 }
876 remove(persist_sub_file_name_.c_str());
877
878 this->innermost().template subscribe<intervehicle::groups::subscription_report>(
879 [this](const intervehicle::protobuf::SubscriptionReport& report)
880 {
881 goby::glog.is_debug1() && goby::glog << "Received subscription report: "
882 << report.ShortDebugString() << std::endl;
883 sub_reports_[report.link_modem_id()] = report;
884 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
885 intervehicle::protobuf::SubscriptionPersistCollection collection;
886 collection.set_time_with_units(
887 goby::time::SystemClock::now<goby::time::MicroTime>());
888 for (auto report_p : sub_reports_)
889 {
890 for (const auto& sub : report_p.second.subscription())
891 *collection.add_subscription() = sub;
892 }
893 google::protobuf::TextFormat::Printer printer;
894 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
896 goby::glog << "Collection: " << collection.ShortDebugString() << std::endl;
897 printer.Print(collection, &oos);
898 });
899 }
900
901 private:
902 intervehicle::protobuf::PortalConfig cfg_;
903
904 struct ModemDriverData
905 {
906 std::unique_ptr<std::thread> underlying_thread;
907 std::unique_ptr<intervehicle::ModemDriverThread<implementation_tag>> modem_driver_thread;
908 std::atomic<bool> driver_thread_alive{true};
909 };
910 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
911 unsigned drivers_ready_{0};
912
913 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
914
915 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
916 std::string persist_sub_file_name_;
917 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
918};
919} // namespace middleware
920} // namespace goby
921
922#endif
simple exception class for goby applications
Definition exception.h:35
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
Definition buffer.pb.h:110
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
Definition buffer.pb.h:302
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
Definition group.h:64
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
Definition group.h:66
Implements the forwarder concept for the intervehicle layer.
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
typename InnerTransporter::implementation_tag implementation_tag
Implements a portal for the intervehicle layer based on Goby Acomms.
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
InterVehicleTransporterBase(InnerTransporter &inner)
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData > > ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData > > expire_handler)
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Represents a subscription to a serialized data type (intervehicle layer).
InvalidPublication(const std::string &e)
InvalidSubscription(const std::string &e)
InvalidUnsubscription(const std::string &e)
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition interface.h:376
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:39
Represents a callback for a published data type (e.g. acked_func or expired_func)
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
Definition publisher.h:91
bool has_set_group_func() const
Definition publisher.h:95
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Definition publisher.h:93
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:235
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:301
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:246
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
subscribed_func_type subscribed_func() const
Definition subscriber.h:92
const goby::middleware::protobuf::TransporterConfig & cfg() const
Definition subscriber.h:80
subscribe_expired_func_type subscribe_expired_func() const
Definition subscriber.h:94
Provides the modem driver thread used by InterVehiclePortal, templated on ImplementationTag so it use...
const ::goby::middleware::intervehicle::protobuf::Header & header() const
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
void set_protobuf_name(ArgT0 &&arg0, ArgT... args)
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
goby::util::logger::GroupSetter group(std::string n)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
constexpr Group modem_subscription_forward_tx
Definition groups.h:48
constexpr Group modem_subscription_forward_rx
Definition groups.h:50
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
constexpr T e
Definition constants.h:35
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
#define GOBY_INTERVEHICLE_API_VERSION
Definition version.h:35