Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
intervehicle.h
Go to the documentation of this file.
1// Copyright 2016-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
26
27#include <atomic>
28#include <functional>
29#include <sys/types.h>
30#include <thread>
31#include <unistd.h>
32
33#include <google/protobuf/io/zero_copy_stream_impl.h>
34
36
38#include "goby/middleware/transport/interthread.h" // used for InterVehiclePortal implementation
42
43namespace goby
44{
45namespace middleware
46{
48{
49 public:
50 InvalidSubscription(const std::string& e) : Exception(e) {}
51};
52
54{
55 public:
56 InvalidPublication(const std::string& e) : Exception(e) {}
57};
58
60{
61 public:
62 InvalidUnsubscription(const std::string& e) : Exception(e) {}
63};
64
69template <typename Derived, typename InnerTransporter>
71 : public StaticTransporterInterface<InterVehicleTransporterBase<Derived, InnerTransporter>,
72 InnerTransporter>,
73 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
74{
75 using InterfaceType =
77 InnerTransporter>;
78
80
81 public:
83 {
86 };
87
90 {
91 // handle request from Portal to omit or include metadata on future publications for a given data type
92 this->inner()
95 [this](const protobuf::SerializerMetadataRequest& request)
96 {
97 glog.is_debug3() && glog << "Received DCCL metadata request: "
98 << request.ShortDebugString() << std::endl;
99
100 switch (request.request())
101 {
103 omit_publish_metadata_.erase(request.key().type());
104 break;
106 omit_publish_metadata_.insert(request.key().type());
107 break;
108 }
109 });
110 }
112
113 virtual ~InterVehicleTransporterBase() = default;
114
116 template <typename Data> static constexpr int scheme()
117 {
118 static_assert(goby::middleware::scheme<typename detail::primitive_type<Data>::type>() ==
120 "Can only use DCCL messages with InterVehicleTransporters");
122 }
123
127 template <const Group& group> void check_validity()
128 {
129 static_assert(group.numeric() != Group::invalid_numeric_group,
130 "goby::middleware::Group must have non-zero numeric "
131 "value to publish on the InterVehicle layer");
132 }
133
141 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
142 void publish_dynamic(const Data& data, const Group& group = Group(),
143 const Publisher<Data>& publisher = Publisher<Data>())
144 {
145 static_assert(scheme == MarshallingScheme::DCCL,
146 "Can only use DCCL messages with InterVehicleTransporters");
147
148 Data data_with_group = data;
149 publisher.set_group(data_with_group, group);
150
151 static_cast<Derived*>(this)->template _publish<Data>(data_with_group, group, publisher);
152 // publish to interprocess as both DCCL and Protobuf
153 this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
154 group, publisher);
155 this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
156 group, publisher);
157 }
158
166 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
167 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group = Group(),
168 const Publisher<Data>& publisher = Publisher<Data>())
169 {
170 static_assert(scheme == MarshallingScheme::DCCL,
171 "Can only use DCCL messages with InterVehicleTransporters");
172 if (data)
173 {
174 // copy this way as it allows us to copy Data == google::protobuf::Message abstract base class
175 std::shared_ptr<Data> data_with_group(data->New());
176 data_with_group->CopyFrom(*data);
177
178 publisher.set_group(*data_with_group, group);
179
180 static_cast<Derived*>(this)->template _publish<Data>(*data_with_group, group,
181 publisher);
182
183 // publish to interprocess as both DCCL and Protobuf
184 this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
185 group, publisher);
186 this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
187 data_with_group, group, publisher);
188 }
189 }
190
198 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
199 void publish_dynamic(std::shared_ptr<Data> data, const Group& group = Group(),
200 const Publisher<Data>& publisher = Publisher<Data>())
201 {
202 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
203 }
204
212 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
213 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group = Group(),
214 const Subscriber<Data>& subscriber = Subscriber<Data>())
215 {
216 static_assert(scheme == MarshallingScheme::DCCL,
217 "Can only use DCCL messages with InterVehicleTransporters");
218 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
219 static_cast<Derived*>(this)->template _subscribe<Data>(
220 pointer_ref_lambda, group, subscriber, SubscriptionAction::SUBSCRIBE);
221 }
222
230 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
231 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f,
232 const Group& group = Group(),
233 const Subscriber<Data>& subscriber = Subscriber<Data>())
234 {
235 static_assert(scheme == MarshallingScheme::DCCL,
236 "Can only use DCCL messages with InterVehicleTransporters");
237 static_cast<Derived*>(this)->template _subscribe<Data>(f, group, subscriber,
239 }
240
247 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
249 const Subscriber<Data>& subscriber = Subscriber<Data>())
250 {
251 static_assert(scheme == MarshallingScheme::DCCL,
252 "Can only use DCCL messages with InterVehicleTransporters");
253 static_cast<Derived*>(this)->template _subscribe<Data>(
254 std::function<void(std::shared_ptr<const Data>)>(), group, subscriber,
256 }
257
258 protected:
259 template <typename Data>
260 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
261 _set_up_publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
262 {
263 if (group.numeric() != Group::broadcast_group && !publisher.has_set_group_func())
264 {
265 std::stringstream ss;
266 ss << "Error: Publisher must have set_group_func in order to publish to a "
267 "non-broadcast Group ("
268 << group
269 << "). The set_group_func modifies the contents of the outgoing message to store "
270 "the group information.";
271 throw(InvalidPublication(ss.str()));
272 }
273
274 auto data = intervehicle::serialize_publication(d, group, publisher);
275
276 if (publisher.cfg().intervehicle().buffer().ack_required())
277 {
278 auto ack_handler = std::make_shared<
280 publisher.acked_func(), d);
281
282 auto expire_handler =
283 std::make_shared<PublisherCallback<Data, MarshallingScheme::DCCL,
285 publisher.expired_func(), d);
286
288 data, ack_handler, expire_handler);
289 }
290
291 if (!omit_publish_metadata_.count(data->key().type()))
292 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
293
295 goby::glog << "Set up publishing for: " << data->ShortDebugString() << std::endl;
296
297 return data;
298 }
299
300 template <typename Data>
301 std::shared_ptr<intervehicle::protobuf::Subscription>
302 _set_up_subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
303 const Subscriber<Data>& subscriber, SubscriptionAction action)
304 {
306
307 switch (action)
308 {
310 {
311 if (group.numeric() != Group::broadcast_group && !subscriber.has_group_func())
312 {
313 std::stringstream ss;
314 ss << "Error: Subscriber must have group_func in order to subscribe to "
315 "non-broadcast Group ("
316 << group
317 << "). The group_func returns the appropriate Group based on the contents "
318 "of the incoming message.";
319 throw(InvalidSubscription(ss.str()));
320 }
321
322 if (subscriber.cfg().intervehicle().broadcast() &&
323 subscriber.cfg().intervehicle().buffer().ack_required())
324 {
325 std::stringstream ss;
326 ss << "Error: Broadcast subscriptions cannot have ack_required: true";
327 throw(InvalidSubscription(ss.str()));
328 }
329
330 auto subscription = std::make_shared<
332 func, group, subscriber);
333
334 this->subscriptions_[dccl_id][group] = subscription;
335 }
336 break;
338 {
339 auto sub_it = this->subscriptions_[dccl_id].find(group);
340 if (sub_it != this->subscriptions_[dccl_id].end())
341 {
342 this->subscriptions_[dccl_id].erase(sub_it);
343 }
344 else
345 {
346 std::stringstream ss;
347 ss << "Cannot unsubscribe to DCCL id: " << dccl_id
348 << " and group: " << std::string(group) << " as no subscription was found.";
349 throw(InvalidUnsubscription(ss.str()));
350 }
351 }
352 break;
353 }
354
355 auto dccl_subscription =
356 this->template _serialize_subscription<Data>(group, subscriber, action);
358 // insert pending subscription
359 auto subscription_publication = intervehicle::serialize_publication(
362
363 // overwrite timestamps to ensure mapping with driver threads
364 auto subscribe_time = dccl_subscription->time_with_units();
365 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
366
367 auto ack_handler = std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
369 subscriber.subscribed_func());
370
371 auto expire_handler =
372 std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
374 subscriber.subscribe_expired_func());
375
376 goby::glog.is_debug1() && goby::glog << "Inserting subscription ack handler for "
377 << subscription_publication->ShortDebugString()
378 << std::endl;
379
380 this->pending_ack_.insert(std::make_pair(*subscription_publication,
381 std::make_tuple(ack_handler, expire_handler)));
382
383 return dccl_subscription;
384 }
385
386 template <int tuple_index, typename AckorExpirePair>
387 void _handle_ack_or_expire(const AckorExpirePair& ack_or_expire_pair)
388 {
389 auto original = ack_or_expire_pair.serializer();
390 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
391 bool is_subscription = original.key().marshalling_scheme() == MarshallingScheme::DCCL &&
392 original.key().type() ==
394
395 if (is_subscription)
396 {
397 // rewrite data to remove src()
398 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
399 decltype(bytes_begin) actual_end;
400
403 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
404 subscription->mutable_header()->set_src(0);
405
406 std::vector<char> bytes(Helper::serialize(*subscription));
407 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
408 original.set_allocated_data(sbytes);
409 }
410
411 auto it = pending_ack_.find(original);
412 if (it != pending_ack_.end())
413 {
414 goby::glog.is_debug3() && goby::glog << ack_or_expire_msg.GetDescriptor()->name()
415 << " for: " << original.ShortDebugString() << ", "
416 << ack_or_expire_msg.ShortDebugString()
417 << std::endl;
418
419 std::get<tuple_index>(it->second)
420 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
421 }
422 else
423 {
424 goby::glog.is_debug3() && goby::glog << "No pending Ack/Expire for "
425 << (is_subscription ? "subscription: " : "data: ")
426 << original.ShortDebugString() << std::endl;
427 }
428 }
429
431 {
432 goby::glog.is_debug3() && goby::glog << "Received DCCLForwarded data: "
433 << packets.ShortDebugString() << std::endl;
434
435 for (const auto& packet : packets.frame())
436 {
437 for (auto p : this->subscriptions_[packet.dccl_id()])
438 p.second->post(packet.data().begin(), packet.data().end(), packets.header());
439 }
440 }
441
442 template <typename Data>
443 std::shared_ptr<intervehicle::protobuf::Subscription>
445 SubscriptionAction action)
446 {
448 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
449 dccl_subscription->mutable_header()->set_src(0);
450
451 for (auto id : subscriber.cfg().intervehicle().publisher_id())
452 dccl_subscription->mutable_header()->add_dest(id);
453
454 dccl_subscription->set_api_version(GOBY_INTERVEHICLE_API_VERSION);
455 dccl_subscription->set_dccl_id(dccl_id);
456 dccl_subscription->set_group(group.numeric());
457 dccl_subscription->set_time_with_units(
458 goby::time::SystemClock::now<goby::time::MicroTime>());
459 dccl_subscription->set_action((action == SubscriptionAction::SUBSCRIBE)
462
463 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
464 *dccl_subscription->mutable_intervehicle() = subscriber.cfg().intervehicle();
465 return dccl_subscription;
466 }
467
469 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
472 expire_handler)
473 {
474 goby::glog.is_debug3() && goby::glog << "Inserting ack handler for "
475 << data->ShortDebugString() << std::endl;
476
477 this->pending_ack_.insert(
478 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
479 }
480
481 protected:
482 // maps DCCL ID to map of Group->subscription
483 // only one subscription allowed per IntervehicleForwarder/Portal (new subscription overwrites old one)
484 std::unordered_map<
485 int, std::unordered_map<std::string, std::shared_ptr<const SerializationHandlerBase<
488
489 private:
490 friend PollerType;
491 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
492 {
493 _expire_pending_ack();
494
495 return static_cast<Derived*>(this)->_poll(lock);
496 }
497
498 template <typename Data> void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta)
499 {
501 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
502 }
503
504 template <typename Data>
505 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta, const Data& d)
506 {
507 meta->set_protobuf_name(
509 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
510 }
511
512 // used to populated InterVehicleSubscription file_descriptor fields
513 void _insert_file_desc_with_dependencies(const google::protobuf::FileDescriptor* file_desc,
514 protobuf::SerializerProtobufMetadata* meta)
515 {
516 for (int i = 0, n = file_desc->dependency_count(); i < n; ++i)
517 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
518
519 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
520 file_desc->CopyTo(file_desc_proto);
521 }
522
523 // expire any pending_ack entries that are no longer relevant
524 void _expire_pending_ack()
525 {
526 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
527 for (auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
528 {
530 ->FindFieldByName("ttl")
531 ->options()
532 .GetExtension(dccl::field)
533 .max() *
535
536 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
537 decltype(now) expire_time(serialize_time + max_ttl);
538
539 // time to let any expire messages from the drivers propagate through the interprocess layer before we remove this
540 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
541
542 // loop through pending ack, and clear any at the front that can be removed
543
544 if (now > expire_time + interprocess_wait)
545 {
546 goby::glog.is_debug3() && goby::glog << "Erasing pending ack for "
547 << it->first.ShortDebugString() << std::endl;
548 it = pending_ack_.erase(it);
549 }
550 else
551 {
552 // pending_ack_ is ordered by serialize time, so we can bail now
553 break;
554 }
555 }
556 }
557
558 private:
559 // maps data with ack_requested onto callbacks for when the data are acknowledged or expire
560 // ordered by serialize time
561 std::map<
562 protobuf::SerializerTransporterMessage,
563 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
564 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
565 pending_ack_;
566
567 // map of Protobuf names where we can omit metadata on publication
568 std::set<std::string> omit_publish_metadata_;
569};
570
575template <typename InnerTransporter>
577 : public InterVehicleTransporterBase<InterVehicleForwarder<InnerTransporter>, InnerTransporter>
578{
579 public:
580 using implementation_tag = typename InnerTransporter::implementation_tag;
581
582 using Base =
584
588 InterVehicleForwarder(InnerTransporter& inner) : Base(inner)
589 {
590 this->inner()
594 { this->_receive(msg); });
595
596 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
597 this->inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
598 [this](const ack_pair_type& ack_pair)
599 { this->template _handle_ack_or_expire<0>(ack_pair); });
600
601 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
602 this->inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
603 [this](const expire_pair_type& expire_pair)
604 { this->template _handle_ack_or_expire<1>(expire_pair); });
605 }
606
607 virtual ~InterVehicleForwarder() = default;
608
609 friend Base;
610
611 private:
612 template <typename Data>
613 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
614 {
615 this->inner().template publish<intervehicle::groups::modem_data_out>(
616 this->_set_up_publish(d, group, publisher));
617 }
618
619 template <typename Data>
620 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
621 const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
622 {
623 try
624 {
625 this->inner()
629 this->_set_up_subscribe(func, group, subscriber, action));
630 }
631 catch (const InvalidUnsubscription& e)
632 {
633 goby::glog.is_warn() && goby::glog << e.what() << std::endl;
634 }
635 }
636
637 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock) { return 0; }
638};
639
643template <typename InnerTransporter>
645 : public InterVehicleTransporterBase<InterVehiclePortal<InnerTransporter>, InnerTransporter>
646{
647 // Derive the ImplementationTag from InnerTransporter so that the modem driver thread
648 // uses the same InterProcessForwarder prefix as the portal's inner transporter.
649 using implementation_tag = typename InnerTransporter::implementation_tag;
650 using modem_id_type = typename goby::middleware::intervehicle::ModemDriverThread<
651 implementation_tag>::modem_id_type;
652
653 public:
654 using Base =
656
660 InterVehiclePortal(const intervehicle::protobuf::PortalConfig& cfg) : cfg_(cfg) { _init(); }
661
667 : Base(inner), cfg_(cfg)
668 {
669 _init();
670 }
671
673 {
674 for (auto& modem_driver_data : modem_drivers_)
675 {
676 modem_driver_data->driver_thread_alive = false;
677 if (modem_driver_data->underlying_thread)
678 modem_driver_data->underlying_thread->join();
679 }
680 }
681
682 friend Base;
683
684 private:
685 template <typename Data>
686 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
687 {
688 this->innermost().template publish<intervehicle::groups::modem_data_out>(
689 this->_set_up_publish(d, group, publisher));
690 }
691
692 template <typename Data>
693 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
694 const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
695 {
696 try
697 {
698 auto dccl_subscription = this->_set_up_subscribe(func, group, subscriber, action);
699
700 this->innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
701 dccl_subscription);
702 }
703 catch (const InvalidUnsubscription& e)
704 {
705 goby::glog.is_warn() && goby::glog << e.what() << std::endl;
706 }
707 }
708
709 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
710 {
711 int items = 0;
713 while (!received_.empty())
714 {
715 this->_receive(received_.front());
716 received_.pop_front();
717 ++items;
718 if (lock)
719 lock.reset();
720 }
721 return items;
722 }
723
724 void _init()
725 {
726 // set up reception of forwarded (via acoustic) subscriptions,
727 // then re-publish to driver threads
728 {
729 using intervehicle::protobuf::Subscription;
730 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
731 {
732 this->innermost()
734 intervehicle::protobuf::Subscription,
736 };
737 auto subscription = std::make_shared<
738 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
739 subscribe_lambda);
740
741 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
742 this->subscriptions_[dccl_id].insert(
743 std::make_pair(subscription->subscribed_group(), subscription));
744 }
745
746 this->innermost().template subscribe<intervehicle::groups::modem_data_in>(
747 [this](const intervehicle::protobuf::DCCLForwardedData& msg)
748 { received_.push_back(msg); });
749
750 // a message requiring ack can be disposed by either [1] ack, [2] expire (TTL exceeded), [3] having no subscribers, [4] queue size exceeded.
751 // post the correct callback (ack for [1] and expire for [2-4])
752 // and remove the pending ack message
753 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
754 this->innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
755 [this](const ack_pair_type& ack_pair)
756 { this->template _handle_ack_or_expire<0>(ack_pair); });
757
758 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
759 this->innermost()
760 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
761 [this](const expire_pair_type& expire_pair)
762 { this->template _handle_ack_or_expire<1>(expire_pair); });
763
764 this->innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
765 [this](const bool& ready)
766 {
767 goby::glog.is_debug1() && goby::glog << "Received driver ready" << std::endl;
768 ++drivers_ready_;
769 });
770
771 // set up before drivers ready to ensure we don't miss subscriptions
772 if (cfg_.has_persist_subscriptions())
773 _set_up_persistent_subscriptions();
774
775 for (int i = 0, n = cfg_.link_size(); i < n; ++i)
776 {
777 auto* link = cfg_.mutable_link(i);
778
779 link->mutable_driver()->set_modem_id(link->modem_id());
780 link->mutable_mac()->set_modem_id(link->modem_id());
781
782 modem_drivers_.emplace_back(new ModemDriverData);
783 ModemDriverData& data = *modem_drivers_.back();
784
785 data.underlying_thread.reset(new std::thread(
786 [&data, link]()
787 {
788 try
789 {
790 data.modem_driver_thread.reset(
791 new intervehicle::ModemDriverThread<implementation_tag>(*link));
792 data.modem_driver_thread->run(data.driver_thread_alive);
793 }
794 catch (std::exception& e)
795 {
797 goby::glog << "Modem driver thread had uncaught exception: " << e.what()
798 << std::endl;
799 throw;
800 }
801 }));
802
803 if (goby::glog.buf().is_gui())
804 // allows for visual grouping of each link in the NCurses gui
805 std::this_thread::sleep_for(std::chrono::milliseconds(250));
806 }
807
808 while (drivers_ready_ < modem_drivers_.size())
809 {
810 goby::glog.is_debug1() && goby::glog << "Waiting for drivers to be ready." << std::endl;
811 this->poll();
812 std::this_thread::sleep_for(std::chrono::seconds(1));
813 }
814
815 // write subscriptions after drivers ready to ensure they aren't missed
816 if (former_sub_collection_.subscription_size() > 0)
817 {
819 goby::glog << "Begin loading subscriptions from persistent storage..." << std::endl;
820 for (const auto& sub : former_sub_collection_.subscription())
821 this->innermost()
822 .template publish<intervehicle::groups::modem_subscription_forward_rx,
823 intervehicle::protobuf::Subscription,
824 MarshallingScheme::PROTOBUF>(sub);
825 }
826 }
827
828 void _set_up_persistent_subscriptions()
829 {
830 const auto& dir = cfg_.persist_subscriptions().dir();
831 if (dir.empty())
832 goby::glog.is_die() && goby::glog << "persist_subscriptions.dir cannot be empty"
833 << std::endl;
834
835 std::stringstream file_name;
836 file_name << dir;
837 if (dir.back() != '/')
838 file_name << "/";
839 file_name << "goby_intervehicle_subscriptions_" << cfg_.persist_subscriptions().name()
840 << ".pb.txt";
841 persist_sub_file_name_ = file_name.str();
842 {
843 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
844 try
845 {
846 if (persist_sub_ifs.is_open())
847 {
848 google::protobuf::TextFormat::Parser parser;
849 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
850 parser.Parse(&iis, &former_sub_collection_);
851 }
852 else
853 {
855 goby::glog << "Could not open persistent subscriptions file: "
856 << persist_sub_file_name_
857 << ". Assuming no persistent subscriptions exist" << std::endl;
858 }
859 }
860 catch (const std::exception& e)
861 {
863 goby::glog << "Error reading persistent subscriptions file: " << e.what()
864 << std::endl;
865 }
866 }
867
868 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
869 if (!persist_sub_ofs.is_open())
870 {
871 goby::glog.is_die() &&
872 goby::glog << "Could not open persistent subscriptions file for writing: "
873 << persist_sub_file_name_ << std::endl;
874 }
875 remove(persist_sub_file_name_.c_str());
876
877 this->innermost().template subscribe<intervehicle::groups::subscription_report>(
878 [this](const intervehicle::protobuf::SubscriptionReport& report)
879 {
880 goby::glog.is_debug1() && goby::glog << "Received subscription report: "
881 << report.ShortDebugString() << std::endl;
882 sub_reports_[report.link_modem_id()] = report;
883 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
884 intervehicle::protobuf::SubscriptionPersistCollection collection;
885 collection.set_time_with_units(
886 goby::time::SystemClock::now<goby::time::MicroTime>());
887 for (auto report_p : sub_reports_)
888 {
889 for (const auto& sub : report_p.second.subscription())
890 *collection.add_subscription() = sub;
891 }
892 google::protobuf::TextFormat::Printer printer;
893 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
895 goby::glog << "Collection: " << collection.ShortDebugString() << std::endl;
896 printer.Print(collection, &oos);
897 });
898 }
899
900 private:
901 intervehicle::protobuf::PortalConfig cfg_;
902
903 struct ModemDriverData
904 {
905 std::unique_ptr<std::thread> underlying_thread;
906 std::unique_ptr<intervehicle::ModemDriverThread<implementation_tag>> modem_driver_thread;
907 std::atomic<bool> driver_thread_alive{true};
908 };
909 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
910 unsigned drivers_ready_{0};
911
912 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
913
914 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
915 std::string persist_sub_file_name_;
916 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
917};
918} // namespace middleware
919} // namespace goby
920
921#endif
simple exception class for goby applications
Definition exception.h:35
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
Definition buffer.pb.h:110
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
Definition buffer.pb.h:302
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
Definition group.h:63
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
Definition group.h:65
Implements the forwarder concept for the intervehicle layer.
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
typename InnerTransporter::implementation_tag implementation_tag
Implements a portal for the intervehicle layer based on Goby Acomms.
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
InterVehicleTransporterBase(InnerTransporter &inner)
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData > > ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData > > expire_handler)
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Represents a subscription to a serialized data type (intervehicle layer).
InvalidPublication(const std::string &e)
InvalidSubscription(const std::string &e)
InvalidUnsubscription(const std::string &e)
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition interface.h:375
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Represents a callback for a published data type (e.g. acked_func or expired_func)
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
Definition publisher.h:91
bool has_set_group_func() const
Definition publisher.h:95
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Definition publisher.h:93
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:234
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:300
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:245
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
subscribed_func_type subscribed_func() const
Definition subscriber.h:92
const goby::middleware::protobuf::TransporterConfig & cfg() const
Definition subscriber.h:80
subscribe_expired_func_type subscribe_expired_func() const
Definition subscriber.h:94
Provides the modem driver thread used by InterVehiclePortal, templated on ImplementationTag so it use...
const ::goby::middleware::intervehicle::protobuf::Header & header() const
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
void set_protobuf_name(ArgT0 &&arg0, ArgT... args)
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
goby::util::logger::GroupSetter group(std::string n)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
constexpr Group modem_subscription_forward_tx
Definition groups.h:48
constexpr Group modem_subscription_forward_rx
Definition groups.h:50
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
constexpr T e
Definition constants.h:35
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
#define GOBY_INTERVEHICLE_API_VERSION
Definition version.h:35