Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
intervehicle.h
Go to the documentation of this file.
1// Copyright 2016-2024:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
25#define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
26
27#include <atomic>
28#include <functional>
29#include <sys/types.h>
30#include <thread>
31#include <unistd.h>
32
33#include <google/protobuf/io/zero_copy_stream_impl.h>
34
36
38#include "goby/middleware/transport/interthread.h" // used for InterVehiclePortal implementation
42
43namespace goby
44{
45namespace middleware
46{
48{
49 public:
50 InvalidSubscription(const std::string& e) : Exception(e) {}
51};
52
54{
55 public:
56 InvalidPublication(const std::string& e) : Exception(e) {}
57};
58
60{
61 public:
62 InvalidUnsubscription(const std::string& e) : Exception(e) {}
63};
64
69template <typename Derived, typename InnerTransporter>
71 : public StaticTransporterInterface<InterVehicleTransporterBase<Derived, InnerTransporter>,
72 InnerTransporter>,
73 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
74{
75 using InterfaceType =
77 InnerTransporter>;
78
80
81 public:
83 {
86 };
87
90 {
91 // handle request from Portal to omit or include metadata on future publications for a given data type
92 this->inner()
95 [this](const protobuf::SerializerMetadataRequest& request)
96 {
97 glog.is_debug3() && glog << "Received DCCL metadata request: "
98 << request.ShortDebugString() << std::endl;
99
100 switch (request.request())
101 {
103 omit_publish_metadata_.erase(request.key().type());
104 break;
106 omit_publish_metadata_.insert(request.key().type());
107 break;
108 }
109 });
110 }
112
113 virtual ~InterVehicleTransporterBase() = default;
114
116 template <typename Data> static constexpr int scheme()
117 {
118 static_assert(goby::middleware::scheme<typename detail::primitive_type<Data>::type>() ==
120 "Can only use DCCL messages with InterVehicleTransporters");
122 }
123
127 template <const Group& group> void check_validity()
128 {
129 static_assert(group.numeric() != Group::invalid_numeric_group,
130 "goby::middleware::Group must have non-zero numeric "
131 "value to publish on the InterVehicle layer");
132 }
133
141 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
142 void publish_dynamic(const Data& data, const Group& group = Group(),
143 const Publisher<Data>& publisher = Publisher<Data>())
144 {
145 static_assert(scheme == MarshallingScheme::DCCL,
146 "Can only use DCCL messages with InterVehicleTransporters");
147
148 Data data_with_group = data;
149 publisher.set_group(data_with_group, group);
150
151 static_cast<Derived*>(this)->template _publish<Data>(data_with_group, group, publisher);
152 // publish to interprocess as both DCCL and Protobuf
153 this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
154 group, publisher);
155 this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
156 group, publisher);
157 }
158
166 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
167 void publish_dynamic(std::shared_ptr<const Data> data, const Group& group = Group(),
168 const Publisher<Data>& publisher = Publisher<Data>())
169 {
170 static_assert(scheme == MarshallingScheme::DCCL,
171 "Can only use DCCL messages with InterVehicleTransporters");
172 if (data)
173 {
174 // copy this way as it allows us to copy Data == google::protobuf::Message abstract base class
175 std::shared_ptr<Data> data_with_group(data->New());
176 data_with_group->CopyFrom(*data);
177
178 publisher.set_group(*data_with_group, group);
179
180 static_cast<Derived*>(this)->template _publish<Data>(*data_with_group, group,
181 publisher);
182
183 // publish to interprocess as both DCCL and Protobuf
184 this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
185 group, publisher);
186 this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
187 data_with_group, group, publisher);
188 }
189 }
190
198 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
199 void publish_dynamic(std::shared_ptr<Data> data, const Group& group = Group(),
200 const Publisher<Data>& publisher = Publisher<Data>())
201 {
202 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
203 }
204
212 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
213 void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group = Group(),
214 const Subscriber<Data>& subscriber = Subscriber<Data>())
215 {
216 static_assert(scheme == MarshallingScheme::DCCL,
217 "Can only use DCCL messages with InterVehicleTransporters");
218 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
219 static_cast<Derived*>(this)->template _subscribe<Data>(
220 pointer_ref_lambda, group, subscriber, SubscriptionAction::SUBSCRIBE);
221 }
222
230 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
231 void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f,
232 const Group& group = Group(),
233 const Subscriber<Data>& subscriber = Subscriber<Data>())
234 {
235 static_assert(scheme == MarshallingScheme::DCCL,
236 "Can only use DCCL messages with InterVehicleTransporters");
237 static_cast<Derived*>(this)->template _subscribe<Data>(f, group, subscriber,
239 }
240
247 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
249 const Subscriber<Data>& subscriber = Subscriber<Data>())
250 {
251 static_assert(scheme == MarshallingScheme::DCCL,
252 "Can only use DCCL messages with InterVehicleTransporters");
253 static_cast<Derived*>(this)->template _subscribe<Data>(
254 std::function<void(std::shared_ptr<const Data>)>(), group, subscriber,
256 }
257
258 protected:
259 template <typename Data>
260 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
261 _set_up_publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
262 {
263 if (group.numeric() != Group::broadcast_group && !publisher.has_set_group_func())
264 {
265 std::stringstream ss;
266 ss << "Error: Publisher must have set_group_func in order to publish to a "
267 "non-broadcast Group ("
268 << group
269 << "). The set_group_func modifies the contents of the outgoing message to store "
270 "the group information.";
271 throw(InvalidPublication(ss.str()));
272 }
273
274 auto data = intervehicle::serialize_publication(d, group, publisher);
275
276 if (publisher.cfg().intervehicle().buffer().ack_required())
277 {
278 auto ack_handler = std::make_shared<
280 publisher.acked_func(), d);
281
282 auto expire_handler =
283 std::make_shared<PublisherCallback<Data, MarshallingScheme::DCCL,
285 publisher.expired_func(), d);
286
288 data, ack_handler, expire_handler);
289 }
290
291 if (!omit_publish_metadata_.count(data->key().type()))
292 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
293
295 goby::glog << "Set up publishing for: " << data->ShortDebugString() << std::endl;
296
297 return data;
298 }
299
300 template <typename Data>
301 std::shared_ptr<intervehicle::protobuf::Subscription>
302 _set_up_subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
303 const Subscriber<Data>& subscriber, SubscriptionAction action)
304 {
306
307 switch (action)
308 {
310 {
311 if (group.numeric() != Group::broadcast_group && !subscriber.has_group_func())
312 {
313 std::stringstream ss;
314 ss << "Error: Subscriber must have group_func in order to subscribe to "
315 "non-broadcast Group ("
316 << group
317 << "). The group_func returns the appropriate Group based on the contents "
318 "of the incoming message.";
319 throw(InvalidSubscription(ss.str()));
320 }
321
322 if (subscriber.cfg().intervehicle().broadcast() &&
323 subscriber.cfg().intervehicle().buffer().ack_required())
324 {
325 std::stringstream ss;
326 ss << "Error: Broadcast subscriptions cannot have ack_required: true";
327 throw(InvalidSubscription(ss.str()));
328 }
329
330 auto subscription = std::make_shared<
332 func, group, subscriber);
333
334 this->subscriptions_[dccl_id][group] = subscription;
335 }
336 break;
338 {
339 auto sub_it = this->subscriptions_[dccl_id].find(group);
340 if (sub_it != this->subscriptions_[dccl_id].end())
341 {
342 this->subscriptions_[dccl_id].erase(sub_it);
343 }
344 else
345 {
346 std::stringstream ss;
347 ss << "Cannot unsubscribe to DCCL id: " << dccl_id
348 << " and group: " << std::string(group) << " as no subscription was found.";
349 throw(InvalidUnsubscription(ss.str()));
350 }
351 }
352 break;
353 }
354
355 auto dccl_subscription =
356 this->template _serialize_subscription<Data>(group, subscriber, action);
358 // insert pending subscription
359 auto subscription_publication = intervehicle::serialize_publication(
362
363 // overwrite timestamps to ensure mapping with driver threads
364 auto subscribe_time = dccl_subscription->time_with_units();
365 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
366
367 auto ack_handler = std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
369 subscriber.subscribed_func());
370
371 auto expire_handler =
372 std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
374 subscriber.subscribe_expired_func());
375
376 goby::glog.is_debug1() && goby::glog << "Inserting subscription ack handler for "
377 << subscription_publication->ShortDebugString()
378 << std::endl;
379
380 this->pending_ack_.insert(std::make_pair(*subscription_publication,
381 std::make_tuple(ack_handler, expire_handler)));
382
383 return dccl_subscription;
384 }
385
386 template <int tuple_index, typename AckorExpirePair>
387 void _handle_ack_or_expire(const AckorExpirePair& ack_or_expire_pair)
388 {
389 auto original = ack_or_expire_pair.serializer();
390 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
391 bool is_subscription = original.key().marshalling_scheme() == MarshallingScheme::DCCL &&
392 original.key().type() ==
394
395 if (is_subscription)
396 {
397 // rewrite data to remove src()
398 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
399 decltype(bytes_begin) actual_end;
400
403 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
404 subscription->mutable_header()->set_src(0);
405
406 std::vector<char> bytes(Helper::serialize(*subscription));
407 std::string* sbytes = new std::string(bytes.begin(), bytes.end());
408 original.set_allocated_data(sbytes);
409 }
410
411 auto it = pending_ack_.find(original);
412 if (it != pending_ack_.end())
413 {
414 goby::glog.is_debug3() && goby::glog << ack_or_expire_msg.GetDescriptor()->name()
415 << " for: " << original.ShortDebugString() << ", "
416 << ack_or_expire_msg.ShortDebugString()
417 << std::endl;
418
419 std::get<tuple_index>(it->second)
420 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
421 }
422 else
423 {
424 goby::glog.is_debug3() && goby::glog << "No pending Ack/Expire for "
425 << (is_subscription ? "subscription: " : "data: ")
426 << original.ShortDebugString() << std::endl;
427 }
428 }
429
431 {
432 goby::glog.is_debug3() && goby::glog << "Received DCCLForwarded data: "
433 << packets.ShortDebugString() << std::endl;
434
435 for (const auto& packet : packets.frame())
436 {
437 for (auto p : this->subscriptions_[packet.dccl_id()])
438 p.second->post(packet.data().begin(), packet.data().end(), packets.header());
439 }
440 }
441
442 template <typename Data>
443 std::shared_ptr<intervehicle::protobuf::Subscription>
445 SubscriptionAction action)
446 {
448 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
449 dccl_subscription->mutable_header()->set_src(0);
450
451 for (auto id : subscriber.cfg().intervehicle().publisher_id())
452 dccl_subscription->mutable_header()->add_dest(id);
453
454 dccl_subscription->set_api_version(GOBY_INTERVEHICLE_API_VERSION);
455 dccl_subscription->set_dccl_id(dccl_id);
456 dccl_subscription->set_group(group.numeric());
457 dccl_subscription->set_time_with_units(
458 goby::time::SystemClock::now<goby::time::MicroTime>());
459 dccl_subscription->set_action((action == SubscriptionAction::SUBSCRIBE)
462
463 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
464 *dccl_subscription->mutable_intervehicle() = subscriber.cfg().intervehicle();
465 return dccl_subscription;
466 }
467
469 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
472 expire_handler)
473 {
474 goby::glog.is_debug3() && goby::glog << "Inserting ack handler for "
475 << data->ShortDebugString() << std::endl;
476
477 this->pending_ack_.insert(
478 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
479 }
480
481 protected:
482 // maps DCCL ID to map of Group->subscription
483 // only one subscription allowed per IntervehicleForwarder/Portal (new subscription overwrites old one)
484 std::unordered_map<
485 int, std::unordered_map<std::string, std::shared_ptr<const SerializationHandlerBase<
488
489 private:
490 friend PollerType;
491 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
492 {
493 _expire_pending_ack();
494
495 return static_cast<Derived*>(this)->_poll(lock);
496 }
497
498 template <typename Data> void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta)
499 {
501 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
502 }
503
504 template <typename Data>
505 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta, const Data& d)
506 {
507 meta->set_protobuf_name(
509 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
510 }
511
512 // used to populated InterVehicleSubscription file_descriptor fields
513 void _insert_file_desc_with_dependencies(const google::protobuf::FileDescriptor* file_desc,
514 protobuf::SerializerProtobufMetadata* meta)
515 {
516 for (int i = 0, n = file_desc->dependency_count(); i < n; ++i)
517 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
518
519 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
520 file_desc->CopyTo(file_desc_proto);
521 }
522
523 // expire any pending_ack entries that are no longer relevant
524 void _expire_pending_ack()
525 {
526 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
527 for (auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
528 {
530 ->FindFieldByName("ttl")
531 ->options()
532 .GetExtension(dccl::field)
533 .max() *
535
536 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
537 decltype(now) expire_time(serialize_time + max_ttl);
538
539 // time to let any expire messages from the drivers propagate through the interprocess layer before we remove this
540 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
541
542 // loop through pending ack, and clear any at the front that can be removed
543
544 if (now > expire_time + interprocess_wait)
545 {
546 goby::glog.is_debug3() && goby::glog << "Erasing pending ack for "
547 << it->first.ShortDebugString() << std::endl;
548 it = pending_ack_.erase(it);
549 }
550 else
551 {
552 // pending_ack_ is ordered by serialize time, so we can bail now
553 break;
554 }
555 }
556 }
557
558 private:
559 // maps data with ack_requested onto callbacks for when the data are acknowledged or expire
560 // ordered by serialize time
561 std::map<
562 protobuf::SerializerTransporterMessage,
563 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
564 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
565 pending_ack_;
566
567 // map of Protobuf names where we can omit metadata on publication
568 std::set<std::string> omit_publish_metadata_;
569};
570
575template <typename InnerTransporter>
577 : public InterVehicleTransporterBase<InterVehicleForwarder<InnerTransporter>, InnerTransporter>
578{
579 public:
580 using Base =
582
586 InterVehicleForwarder(InnerTransporter& inner) : Base(inner)
587 {
588 this->inner()
592 { this->_receive(msg); });
593
594 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
595 this->inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
596 [this](const ack_pair_type& ack_pair)
597 { this->template _handle_ack_or_expire<0>(ack_pair); });
598
599 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
600 this->inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
601 [this](const expire_pair_type& expire_pair)
602 { this->template _handle_ack_or_expire<1>(expire_pair); });
603 }
604
605 virtual ~InterVehicleForwarder() = default;
606
607 friend Base;
608
609 private:
610 template <typename Data>
611 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
612 {
613 this->inner().template publish<intervehicle::groups::modem_data_out>(
614 this->_set_up_publish(d, group, publisher));
615 }
616
617 template <typename Data>
618 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
619 const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
620 {
621 try
622 {
623 this->inner()
627 this->_set_up_subscribe(func, group, subscriber, action));
628 }
629 catch (const InvalidUnsubscription& e)
630 {
631 goby::glog.is_warn() && goby::glog << e.what() << std::endl;
632 }
633 }
634
635 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) { return 0; }
636};
637
641template <typename InnerTransporter>
643 : public InterVehicleTransporterBase<InterVehiclePortal<InnerTransporter>, InnerTransporter>
644{
646
647 public:
648 using Base =
650
654 InterVehiclePortal(const intervehicle::protobuf::PortalConfig& cfg) : cfg_(cfg) { _init(); }
655
661 : Base(inner), cfg_(cfg)
662 {
663 _init();
664 }
665
667 {
668 for (auto& modem_driver_data : modem_drivers_)
669 {
670 modem_driver_data->driver_thread_alive = false;
671 if (modem_driver_data->underlying_thread)
672 modem_driver_data->underlying_thread->join();
673 }
674 }
675
676 friend Base;
677
678 private:
679 template <typename Data>
680 void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
681 {
682 this->innermost().template publish<intervehicle::groups::modem_data_out>(
683 this->_set_up_publish(d, group, publisher));
684 }
685
686 template <typename Data>
687 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
688 const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
689 {
690 try
691 {
692 auto dccl_subscription = this->_set_up_subscribe(func, group, subscriber, action);
693
694 this->innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
695 dccl_subscription);
696 }
697 catch (const InvalidUnsubscription& e)
698 {
699 goby::glog.is_warn() && goby::glog << e.what() << std::endl;
700 }
701 }
702
703 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
704 {
705 int items = 0;
707 while (!received_.empty())
708 {
709 this->_receive(received_.front());
710 received_.pop_front();
711 ++items;
712 if (lock)
713 lock.reset();
714 }
715 return items;
716 }
717
718 void _init()
719 {
720 // set up reception of forwarded (via acoustic) subscriptions,
721 // then re-publish to driver threads
722 {
723 using intervehicle::protobuf::Subscription;
724 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
725 {
726 this->innermost()
728 intervehicle::protobuf::Subscription,
730 };
731 auto subscription = std::make_shared<
732 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
733 subscribe_lambda);
734
735 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
736 this->subscriptions_[dccl_id].insert(
737 std::make_pair(subscription->subscribed_group(), subscription));
738 }
739
740 this->innermost().template subscribe<intervehicle::groups::modem_data_in>(
741 [this](const intervehicle::protobuf::DCCLForwardedData& msg)
742 { received_.push_back(msg); });
743
744 // a message requiring ack can be disposed by either [1] ack, [2] expire (TTL exceeded), [3] having no subscribers, [4] queue size exceeded.
745 // post the correct callback (ack for [1] and expire for [2-4])
746 // and remove the pending ack message
747 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
748 this->innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
749 [this](const ack_pair_type& ack_pair)
750 { this->template _handle_ack_or_expire<0>(ack_pair); });
751
752 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
753 this->innermost()
754 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
755 [this](const expire_pair_type& expire_pair)
756 { this->template _handle_ack_or_expire<1>(expire_pair); });
757
758 this->innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
759 [this](const bool& ready)
760 {
761 goby::glog.is_debug1() && goby::glog << "Received driver ready" << std::endl;
762 ++drivers_ready_;
763 });
764
765 // set up before drivers ready to ensure we don't miss subscriptions
766 if (cfg_.has_persist_subscriptions())
767 _set_up_persistent_subscriptions();
768
769 for (int i = 0, n = cfg_.link_size(); i < n; ++i)
770 {
771 auto* link = cfg_.mutable_link(i);
772
773 link->mutable_driver()->set_modem_id(link->modem_id());
774 link->mutable_mac()->set_modem_id(link->modem_id());
775
776 modem_drivers_.emplace_back(new ModemDriverData);
777 ModemDriverData& data = *modem_drivers_.back();
778
779 data.underlying_thread.reset(new std::thread(
780 [&data, link]()
781 {
782 try
783 {
784 data.modem_driver_thread.reset(new intervehicle::ModemDriverThread(*link));
785 data.modem_driver_thread->run(data.driver_thread_alive);
786 }
787 catch (std::exception& e)
788 {
790 goby::glog << "Modem driver thread had uncaught exception: " << e.what()
791 << std::endl;
792 throw;
793 }
794 }));
795
796 if (goby::glog.buf().is_gui())
797 // allows for visual grouping of each link in the NCurses gui
798 std::this_thread::sleep_for(std::chrono::milliseconds(250));
799 }
800
801 while (drivers_ready_ < modem_drivers_.size())
802 {
803 goby::glog.is_debug1() && goby::glog << "Waiting for drivers to be ready." << std::endl;
804 this->poll();
805 std::this_thread::sleep_for(std::chrono::seconds(1));
806 }
807
808 // write subscriptions after drivers ready to ensure they aren't missed
809 if (former_sub_collection_.subscription_size() > 0)
810 {
812 goby::glog << "Begin loading subscriptions from persistent storage..." << std::endl;
813 for (const auto& sub : former_sub_collection_.subscription())
814 this->innermost()
815 .template publish<intervehicle::groups::modem_subscription_forward_rx,
816 intervehicle::protobuf::Subscription,
817 MarshallingScheme::PROTOBUF>(sub);
818 }
819 }
820
821 void _set_up_persistent_subscriptions()
822 {
823 const auto& dir = cfg_.persist_subscriptions().dir();
824 if (dir.empty())
825 goby::glog.is_die() && goby::glog << "persist_subscriptions.dir cannot be empty"
826 << std::endl;
827
828 std::stringstream file_name;
829 file_name << dir;
830 if (dir.back() != '/')
831 file_name << "/";
832 file_name << "goby_intervehicle_subscriptions_" << cfg_.persist_subscriptions().name()
833 << ".pb.txt";
834 persist_sub_file_name_ = file_name.str();
835 {
836 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
837 try
838 {
839 if (persist_sub_ifs.is_open())
840 {
841 google::protobuf::TextFormat::Parser parser;
842 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
843 parser.Parse(&iis, &former_sub_collection_);
844 }
845 else
846 {
848 goby::glog << "Could not open persistent subscriptions file: "
849 << persist_sub_file_name_
850 << ". Assuming no persistent subscriptions exist" << std::endl;
851 }
852 }
853 catch (const std::exception& e)
854 {
856 goby::glog << "Error reading persistent subscriptions file: " << e.what()
857 << std::endl;
858 }
859 }
860
861 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
862 if (!persist_sub_ofs.is_open())
863 {
864 goby::glog.is_die() &&
865 goby::glog << "Could not open persistent subscriptions file for writing: "
866 << persist_sub_file_name_ << std::endl;
867 }
868 remove(persist_sub_file_name_.c_str());
869
870 this->innermost().template subscribe<intervehicle::groups::subscription_report>(
871 [this](const intervehicle::protobuf::SubscriptionReport& report)
872 {
873 goby::glog.is_debug1() && goby::glog << "Received subscription report: "
874 << report.ShortDebugString() << std::endl;
875 sub_reports_[report.link_modem_id()] = report;
876 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
877 intervehicle::protobuf::SubscriptionPersistCollection collection;
878 collection.set_time_with_units(
879 goby::time::SystemClock::now<goby::time::MicroTime>());
880 for (auto report_p : sub_reports_)
881 {
882 for (const auto& sub : report_p.second.subscription())
883 *collection.add_subscription() = sub;
884 }
885 google::protobuf::TextFormat::Printer printer;
886 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
888 goby::glog << "Collection: " << collection.ShortDebugString() << std::endl;
889 printer.Print(collection, &oos);
890 });
891 }
892
893 private:
894 intervehicle::protobuf::PortalConfig cfg_;
895
896 struct ModemDriverData
897 {
898 std::unique_ptr<std::thread> underlying_thread;
899 std::unique_ptr<intervehicle::ModemDriverThread> modem_driver_thread;
900 std::atomic<bool> driver_thread_alive{true};
901 };
902 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
903 unsigned drivers_ready_{0};
904
905 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
906
907 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
908 std::string persist_sub_file_name_;
909 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
910};
911} // namespace middleware
912} // namespace goby
913
914#endif
simple exception class for goby applications
Definition exception.h:35
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
Definition buffer.pb.h:110
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
Definition buffer.pb.h:302
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
Definition group.h:63
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
Definition group.h:65
Implements the forwarder concept for the intervehicle layer.
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
Implements a portal for the intervehicle layer based on Goby Acomms.
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
InterVehicleTransporterBase(InnerTransporter &inner)
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData > > ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData > > expire_handler)
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
void check_validity()
Check validity of the Group for interthread use (at compile time)
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Represents a subscription to a serialized data type (intervehicle layer).
InvalidPublication(const std::string &e)
InvalidSubscription(const std::string &e)
InvalidUnsubscription(const std::string &e)
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition interface.h:345
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition poller.h:38
Represents a callback for a published data type (e.g. acked_func or expired_func)
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition publisher.h:81
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
Definition publisher.h:91
bool has_set_group_func() const
Definition publisher.h:95
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Definition publisher.h:93
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition interface.h:204
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition interface.h:270
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:215
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
subscribed_func_type subscribed_func() const
Definition subscriber.h:92
const goby::middleware::protobuf::TransporterConfig & cfg() const
Definition subscriber.h:80
subscribe_expired_func_type subscribe_expired_func() const
Definition subscriber.h:94
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
const ::goby::middleware::intervehicle::protobuf::Header & header() const
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
static const ::PROTOBUF_NAMESPACE_ID::Descriptor * descriptor()
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
void set_protobuf_name(ArgT0 &&arg0, ArgT... args)
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
goby::util::logger::GroupSetter group(std::string n)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
constexpr Group modem_subscription_forward_tx
Definition groups.h:48
constexpr Group modem_subscription_forward_rx
Definition groups.h:50
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
constexpr T e
Definition constants.h:35
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
#define GOBY_INTERVEHICLE_API_VERSION
Definition version.h:35