Goby3  3.1.5a
2024.05.23
interthread.h
Go to the documentation of this file.
1 // Copyright 2016-2021:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERTHREAD_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERTHREAD_H
26 
27 #include <functional> // for fun...
28 #include <memory> // for sha...
29 #include <mutex> // for mutex
30 #include <thread> // for get_id
31 
32 #include "goby/exception.h" // for Exc...
33 #include "goby/middleware/group.h" // for Group
34 #include "goby/middleware/marshalling/interface.h" // for Mar...
36 #include "goby/middleware/transport/interface.h" // for Sta...
37 #include "goby/middleware/transport/null.h" // for Nul...
38 #include "goby/middleware/transport/poller.h" // for Poller
39 #include "goby/middleware/transport/publisher.h" // for Pub...
40 #include "goby/middleware/transport/subscriber.h" // for Sub...
41 
42 namespace goby
43 {
44 namespace middleware
45 {
57  : public StaticTransporterInterface<InterThreadTransporter, NullTransporter>,
58  public Poller<InterThreadTransporter>
59 {
60  private:
61  struct EmptyMessage
62  {
63  };
64 
65  public:
66  InterThreadTransporter() : data_mutex_(std::make_shared<std::mutex>()) {}
67 
69  {
70  detail::SubscriptionStoreBase::unsubscribe_all(std::this_thread::get_id());
71  detail::SubscriptionStoreBase::remove(std::this_thread::get_id());
72  }
73 
75  template <typename Data> static constexpr int scheme() { return MarshallingScheme::CXX_OBJECT; }
76 
78  template <const Group& group> void check_validity()
79  {
80  static_assert((group.c_str() != nullptr) && (group.c_str()[0] != '\0'),
81  "goby::middleware::Group must have non-zero length string to publish on the "
82  "InterThread layer");
83  }
84 
87  {
88  if ((group.c_str() == nullptr) || (group.c_str()[0] == '\0'))
89  throw(goby::Exception("Group must have a non-empty string for use on InterThread"));
90  }
91 
99  template <typename Data, int scheme = scheme<Data>()>
100  void publish_dynamic(const Data& data, const Group& group,
101  const Publisher<Data>& publisher = Publisher<Data>())
102  {
104  std::shared_ptr<Data> data_ptr(new Data(data));
105  publish_dynamic<Data>(data_ptr, group, publisher);
106  }
107 
115  template <typename Data, int scheme = scheme<Data>()>
116  void publish_dynamic(std::shared_ptr<const Data> data, const Group& group,
117  const Publisher<Data>& publisher = Publisher<Data>())
118  {
121  }
122 
130  template <typename Data, int scheme = scheme<Data>()>
131  void publish_dynamic(std::shared_ptr<Data> data, const Group& group,
132  const Publisher<Data>& publisher = Publisher<Data>())
133  {
134  publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
135  }
136 
138  template <const Group& group> void publish_empty()
139  {
140  publish_dynamic<EmptyMessage>(
141  std::shared_ptr<EmptyMessage>(std::make_shared<EmptyMessage>()), group);
142  }
143 
150  template <typename Data, int scheme = scheme<Data>()>
151  void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group,
152  const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
153  {
155  detail::SubscriptionStore<Data>::subscribe([=](std::shared_ptr<const Data> pd) { f(*pd); },
156  group, std::this_thread::get_id(), data_mutex_,
159  }
160 
167  template <typename Data, int scheme = scheme<Data>()>
168  void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f, const Group& group,
169  const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
170  {
173  f, group, std::this_thread::get_id(), data_mutex_, Poller<InterThreadTransporter>::cv(),
175  }
176 
178  template <const Group& group> void subscribe_empty(const std::function<void()>& f)
179  {
180  subscribe_dynamic<EmptyMessage>([=](const std::shared_ptr<const EmptyMessage>&) { f(); },
181  group);
182  }
183 
189  template <typename Data, int scheme = scheme<Data>()>
191  const Subscriber<Data>& /*subscriber*/ = Subscriber<Data>())
192  {
194  detail::SubscriptionStore<Data>::unsubscribe(group, std::this_thread::get_id());
195  }
196 
199  {
200  detail::SubscriptionStoreBase::unsubscribe_all(std::this_thread::get_id());
201  }
202 
203  private:
205  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
206  {
207  return detail::SubscriptionStoreBase::poll_all(std::this_thread::get_id(), lock);
208  }
209 
210  private:
211  // protects this thread's DataQueue
212  std::shared_ptr<std::mutex> data_mutex_;
213 };
214 
215 } // namespace middleware
216 } // namespace goby
217 
218 #endif
goby::middleware::InterThreadTransporter::subscribe_empty
void subscribe_empty(const std::function< void()> &f)
Subscribe with no data (used to receive a signal from another thread)
Definition: interthread.h:178
goby::middleware::StaticTransporterInterface
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition: interface.h:203
goby::middleware::MarshallingScheme::CXX_OBJECT
@ CXX_OBJECT
Definition: interface.h:57
interface.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
group.h
goby::util::logger::mutex
std::recursive_mutex mutex
goby::middleware::InterThreadTransporter::InterThreadTransporter
InterThreadTransporter()
Definition: interthread.h:66
goby::middleware::Poller
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition: poller.h:37
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::middleware::InterThreadTransporter
A transporter for the interthread layer.
Definition: interthread.h:56
subscriber.h
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::detail::SubscriptionStore::unsubscribe
static void unsubscribe(const Group &group, std::thread::id thread_id)
Definition: subscription_store.h:163
goby::middleware::InterThreadTransporter::publish_dynamic
void publish_dynamic(const Data &data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
Definition: interthread.h:100
null.h
goby::middleware::InterThreadTransporter::~InterThreadTransporter
virtual ~InterThreadTransporter()
Definition: interthread.h:68
goby::middleware::detail::SubscriptionStore::subscribe
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable_any > cv, std::shared_ptr< std::timed_mutex > poller_mutex)
Definition: subscription_store.h:130
goby::middleware::InterThreadTransporter::subscribe_dynamic
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
Definition: interthread.h:168
goby::middleware::detail::SubscriptionStore::publish
static void publish(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher)
Definition: subscription_store.h:191
goby::middleware::InterThreadTransporter::check_validity_runtime
void check_validity_runtime(const Group &group)
Check validity of the Group for interthread use (for DynamicGroup at run time)
Definition: interthread.h:86
goby::middleware::detail::SubscriptionStoreBase::unsubscribe_all
static void unsubscribe_all(std::thread::id thread_id)
Definition: subscription_store.h:77
publisher.h
goby::middleware::InterThreadTransporter::publish_dynamic
void publish_dynamic(std::shared_ptr< Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
Definition: interthread.h:131
goby::middleware::detail::SubscriptionStoreBase::poll_all
static int poll_all(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::timed_mutex >> &lock)
Definition: subscription_store.h:60
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
goby::middleware::PollerInterface::poll_mutex
std::shared_ptr< std::timed_mutex > poll_mutex()
access the mutex used for poll synchronization
Definition: interface.h:160
goby::middleware::InterThreadTransporter::scheme
static constexpr int scheme()
Scheme for interthread is always MarshallingScheme::CXX_OBJECT as the data are not serialized,...
Definition: interthread.h:75
goby::middleware::PollerInterface::cv
std::shared_ptr< std::condition_variable_any > cv()
access the condition variable used for poll synchronization
Definition: interface.h:166
goby::middleware::InterThreadTransporter::check_validity
void check_validity()
Check validity of the Group for interthread use (at compile time)
Definition: interthread.h:78
interface.h
goby::middleware::InterThreadTransporter::publish_empty
void publish_empty()
Publish with no data (used to signal another thread)
Definition: interthread.h:138
goby::middleware::InterThreadTransporter::subscribe_dynamic
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
Definition: interthread.h:151
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
exception.h
goby::middleware::detail::SubscriptionStoreBase::remove
static void remove(std::thread::id thread_id)
Definition: subscription_store.h:86
subscription_store.h
poller.h
goby::middleware::InterThreadTransporter::unsubscribe_all
void unsubscribe_all()
Unsubscribe from all current subscriptions.
Definition: interthread.h:198
goby::middleware::InterThreadTransporter::unsubscribe_dynamic
void unsubscribe_dynamic(const Group &group, const Subscriber< Data > &=Subscriber< Data >())
Unsubscribe to a specific run-time defined group and data type. Where possible, prefer the static var...
Definition: interthread.h:190
goby::middleware::InterThreadTransporter::publish_dynamic
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
Definition: interthread.h:116