Goby3  3.1.4
2024.02.22
thread.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 // James D. Turner <james.turner@nrl.navy.mil>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_MIDDLEWARE_APPLICATION_THREAD_H
26 #define GOBY_MIDDLEWARE_APPLICATION_THREAD_H
27 
28 #include <atomic>
29 #include <chrono>
30 #include <memory>
31 #include <mutex>
32 #include <typeindex>
33 
34 #include <boost/units/systems/si.hpp>
35 
36 #include "goby/exception.h"
39 
40 #include "goby/middleware/common.h"
41 #include "goby/middleware/group.h"
42 #include "goby/time/simulation.h"
43 
44 namespace goby
45 {
46 namespace middleware
47 {
49 {
50  std::type_index type_i{std::type_index(typeid(void))};
51  int index{-1};
52  bool all_threads{false};
53 };
54 
60 template <typename Config, typename TransporterType> class Thread
61 {
62  private:
63  TransporterType* transporter_{nullptr};
64 
65  boost::units::quantity<boost::units::si::frequency> loop_frequency_;
66  std::chrono::steady_clock::time_point loop_time_;
67  unsigned long long loop_count_{0};
68  const Config cfg_;
69  int index_;
70  std::atomic<bool>* alive_{nullptr};
71  std::type_index type_i_{std::type_index(typeid(void))};
72 
73  // Linux/Apple TID, from gettid()
74 #ifdef __APPLE__
75  uint64_t thread_id_;
76 #else
77  int thread_id_;
78 #endif
79  // demangled class name / index
80  std::string thread_name_;
81 
82  // unique id value used by MultiThreadApplication
83  int uid_;
84 
85  bool finalize_run_{false};
86 
87  public:
88  using Transporter = TransporterType;
89 
96  Thread(const Config& cfg, TransporterType* transporter, int index)
97  : Thread(cfg, transporter, 0 * boost::units::si::hertz, index)
98  {
99  }
100 
107  Thread(const Config& cfg, TransporterType* transporter, double loop_freq_hertz = 0,
108  int index = -1)
109  : Thread(cfg, transporter, loop_freq_hertz * boost::units::si::hertz, index)
110  {
111  }
112 
119  Thread(const Config& cfg, TransporterType* transporter,
120  boost::units::quantity<boost::units::si::frequency> loop_freq, int index = -1)
121  : Thread(cfg, loop_freq, index)
122  {
124  }
125 
126  virtual ~Thread() {}
127 
131  void run(std::atomic<bool>& alive)
132  {
133  alive_ = &alive;
134  do_subscribe();
135  initialize();
136  while (alive) { run_once(); }
137  if (!finalize_run_)
138  {
139  finalize();
140  finalize_run_ = true;
141  }
142  }
143 
145  int index() const { return index_; }
146 
147  std::type_index type_index() { return type_i_; }
148  void set_type_index(std::type_index type_i) { type_i_ = type_i; }
149 
150  std::string name() { return thread_name_; }
151  void set_name(const std::string& name) { thread_name_ = name; }
152 
153  int uid() { return uid_; }
154  void set_uid(int uid) { uid_ = uid; }
155 
156  static constexpr goby::middleware::Group shutdown_group_{"goby::middleware::Thread::shutdown"};
157  static constexpr goby::middleware::Group joinable_group_{"goby::middleware::Thread::joinable"};
158 
159  protected:
160  Thread(const Config& cfg, boost::units::quantity<boost::units::si::frequency> loop_freq,
161  int index = -1)
162  : loop_frequency_(loop_freq),
163  loop_time_(std::chrono::steady_clock::now()),
164  cfg_(cfg),
165  index_(index),
166  thread_id_(goby::middleware::gettid()),
167  thread_name_(std::to_string(thread_id_)),
168  uid_(-1)
169  {
170  if (loop_frequency_hertz() > 0 &&
171  loop_frequency_hertz() != std::numeric_limits<double>::infinity())
172  {
173  unsigned long long microsec_interval =
175 
176  unsigned long long ticks_since_epoch =
177  std::chrono::duration_cast<std::chrono::microseconds>(loop_time_.time_since_epoch())
178  .count() /
179  microsec_interval;
180 
181  loop_time_ = std::chrono::steady_clock::time_point(
182  std::chrono::microseconds((ticks_since_epoch + 1) * microsec_interval));
183  }
184  }
185 
186  void set_transporter(TransporterType* transporter) { transporter_ = transporter; }
187 
188  virtual void loop()
189  {
190  throw(std::runtime_error(
191  "void Thread::loop() must be overridden for non-zero loop frequencies"));
192  }
193 
194  double loop_frequency_hertz() const { return loop_frequency_ / boost::units::si::hertz; }
195  decltype(loop_frequency_) loop_frequency() const { return loop_frequency_; }
196  double loop_max_frequency() const { return std::numeric_limits<double>::infinity(); }
197  void run_once();
198 
199  TransporterType& transporter() const { return *transporter_; }
200 
201  const Config& cfg() const { return cfg_; }
202 
203  // called after alive() is true, but before run()
204  virtual void initialize() {}
205 
206  // called after alive() is false
207  virtual void finalize() {}
208 
210  {
211  health.set_name(thread_name_);
212 #ifdef __APPLE__
213  health.set_thread_id_apple(thread_id_);
214 #else
215  health.set_thread_id(thread_id_);
216 #endif
217  if (uid_ >= 0)
218  health.set_uid(uid_);
219  this->health(health);
220  }
221 
226  {
228  }
229 
230  void thread_quit()
231  {
232  (*alive_) = false;
233  if (!finalize_run_)
234  {
235  finalize();
236  finalize_run_ = true;
237  }
238  }
239 
240  bool alive() { return alive_ && *alive_; }
241 
242  private:
243  void do_subscribe()
244  {
245  if (!transporter_)
246  {
247  throw(goby::Exception(
248  "Thread::transporter_ is null. Must set_transporter() before using"));
249  }
250 
251  transporter()
252  .innermost()
253  .template subscribe<shutdown_group_, ThreadIdentifier, MarshallingScheme::CXX_OBJECT>(
254  [this](const ThreadIdentifier ti) {
255  if (ti.all_threads ||
256  (ti.type_i == this->type_index() && ti.index == this->index()))
257  {
258  this->thread_quit();
259  }
260  });
261  }
262 };
263 
264 } // namespace middleware
265 
266 template <typename Config, typename TransporterType>
267 constexpr goby::middleware::Group
269 
270 template <typename Config, typename TransporterType>
271 constexpr goby::middleware::Group
273 
274 template <typename Config, typename TransporterType>
276 {
277  if (!transporter_)
278  throw(goby::Exception("Null transporter"));
279 
280  if (loop_frequency_hertz() == std::numeric_limits<double>::infinity())
281  {
282  // call loop as fast as possible
283  transporter_->poll(std::chrono::seconds(0));
284  loop();
285  }
286  else if (loop_frequency_hertz() > 0)
287  {
288  int events = transporter_->poll(loop_time_);
289 
290  // timeout
291  if (events == 0)
292  {
293  loop();
294  ++loop_count_;
295  loop_time_ += std::chrono::nanoseconds(
296  (unsigned long long)(1000000000ull / (loop_frequency_hertz() *
298  }
299  }
300  else
301  {
302  // don't call loop()
303  transporter_->poll();
304  }
305 }
306 } // namespace goby
307 
308 #endif
goby::middleware::Thread::initialize
virtual void initialize()
Definition: thread.h:204
goby::middleware::ThreadIdentifier
Definition: thread.h:48
goby::middleware::ThreadIdentifier::all_threads
bool all_threads
Definition: thread.h:52
goby::middleware::Thread::shutdown_group_
static constexpr goby::middleware::Group shutdown_group_
Definition: thread.h:156
goby::middleware::Thread::loop
virtual void loop()
Definition: thread.h:188
goby::middleware::InterVehicleForwarder< InterProcessPortal< InterThreadTransporter > >
goby::middleware::Thread::run_once
void run_once()
Definition: thread.h:275
interface.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::Thread::joinable_group_
static constexpr goby::middleware::Group joinable_group_
Definition: thread.h:157
group.h
goby::middleware::Thread::cfg
const Config & cfg() const
Definition: thread.h:201
goby::middleware::Thread::name
std::string name()
Definition: thread.h:150
goby::middleware::protobuf::ThreadHealth
Definition: coroner.pb.h:231
goby::middleware::Thread::finalize
virtual void finalize()
Definition: thread.h:207
goby::middleware::Thread::index
int index() const
Definition: thread.h:145
goby::middleware::Thread::loop_frequency_hertz
double loop_frequency_hertz() const
Definition: thread.h:194
boost
Definition: udp_driver.h:41
goby::middleware::Thread::Thread
Thread(const Config &cfg, TransporterType *transporter, double loop_freq_hertz=0, int index=-1)
Construct a thread with all possible metadata (using double to specify frequency in Hertz)
Definition: thread.h:107
goby::middleware::Thread::type_index
std::type_index type_index()
Definition: thread.h:147
goby::middleware::Thread::alive
bool alive()
Definition: thread.h:240
goby::middleware::Thread::set_transporter
void set_transporter(TransporterType *transporter)
Definition: thread.h:186
goby::middleware::Thread::Thread
Thread(const Config &cfg, TransporterType *transporter, int index)
Construct a thread with a given configuration, underlying transporter, and index (for multiple instan...
Definition: thread.h:96
goby::middleware::ThreadIdentifier::type_i
std::type_index type_i
Definition: thread.h:50
goby::middleware::Thread::loop_frequency
decltype(loop_frequency_) loop_frequency() const
Definition: thread.h:195
goby::middleware::Thread::set_uid
void set_uid(int uid)
Definition: thread.h:154
goby::middleware::Thread::uid
int uid()
Definition: thread.h:153
goby::middleware::Thread
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition: thread.h:60
goby::middleware::Thread::Thread
Thread(const Config &cfg, TransporterType *transporter, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
Construct a thread with all possible metadata (using boost::units to specify frequency)
Definition: thread.h:119
goby::middleware::Thread::transporter
TransporterType & transporter() const
Definition: thread.h:199
goby::time::SimulatorSettings::warp_factor
static int warp_factor
Warp factor to speed up (or slow time) the time values returned by SteadyClock::now() and SystemClock...
Definition: simulation.h:40
goby::middleware::Thread::~Thread
virtual ~Thread()
Definition: thread.h:126
goby::middleware::Thread::thread_quit
void thread_quit()
Definition: thread.h:230
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::middleware::Thread::set_name
void set_name(const std::string &name)
Definition: thread.h:151
goby::middleware::Thread::run
void run(std::atomic< bool > &alive)
Run the thread until the boolean reference passed is set false. This call blocks, and should be run i...
Definition: thread.h:131
common.h
goby::middleware::Thread::Thread
Thread(const Config &cfg, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
Definition: thread.h:160
goby::middleware::Thread::set_type_index
void set_type_index(std::type_index type_i)
Definition: thread.h:148
goby::middleware::protobuf::HEALTH__OK
@ HEALTH__OK
Definition: coroner.pb.h:87
goby::middleware::Thread::health
virtual void health(goby::middleware::protobuf::ThreadHealth &health)
Called when HealthRequest is made by goby_coroner.
Definition: thread.h:225
goby::middleware::Thread::loop_max_frequency
double loop_max_frequency() const
Definition: thread.h:196
simulation.h
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
exception.h
goby::middleware::to_string
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition: common.h:44
goby::middleware::ThreadIdentifier::index
int index
Definition: thread.h:51
goby::middleware::Thread::thread_health
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
Definition: thread.h:209
coroner.pb.h