Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
thread.h
Go to the documentation of this file.
1// Copyright 2016-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// James D. Turner <james.turner@nrl.navy.mil>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_APPLICATION_THREAD_H
26#define GOBY_MIDDLEWARE_APPLICATION_THREAD_H
27
28#include <atomic>
29#include <chrono>
30#include <memory>
31#include <mutex>
32#include <typeindex>
33
34#include <boost/units/systems/si.hpp>
35
36#include "goby/exception.h"
39
43
44namespace goby
45{
46namespace middleware
47{
49{
50 std::type_index type_i{std::type_index(typeid(void))};
51 int index{-1};
52 bool all_threads{false};
53};
54
60template <typename Config, typename TransporterType> class Thread
61{
62 private:
63 TransporterType* transporter_{nullptr};
64
65 boost::units::quantity<boost::units::si::frequency> loop_frequency_;
66 std::chrono::steady_clock::time_point loop_time_;
67 unsigned long long loop_count_{0};
68 const Config cfg_;
69 int index_;
70 std::atomic<bool>* alive_{nullptr};
71 std::type_index type_i_{std::type_index(typeid(void))};
72
73 // Linux/Apple TID, from gettid()
74#ifdef __APPLE__
75 uint64_t thread_id_;
76#else
77 int thread_id_;
78#endif
79 // demangled class name / index
80 std::string thread_name_;
81
82 // unique id value used by MultiThreadApplication
83 int uid_;
84
85 bool finalize_run_{false};
86
87 public:
88 using Transporter = TransporterType;
89
96 Thread(const Config& cfg, TransporterType* transporter, int index)
97 : Thread(cfg, transporter, 0 * boost::units::si::hertz, index)
98 {
99 }
100
107 Thread(const Config& cfg, TransporterType* transporter, double loop_freq_hertz = 0,
108 int index = -1)
109 : Thread(cfg, transporter, loop_freq_hertz * boost::units::si::hertz, index)
110 {
111 }
112
119 Thread(const Config& cfg, TransporterType* transporter,
120 boost::units::quantity<boost::units::si::frequency> loop_freq, int index = -1)
121 : Thread(cfg, loop_freq, index)
122 {
124 }
125
126 virtual ~Thread() {}
127
131 void run(std::atomic<bool>& alive)
132 {
133 alive_ = &alive;
134 do_subscribe();
135 initialize();
136 while (alive) { run_once(); }
137 if (!finalize_run_)
138 {
139 finalize();
140 finalize_run_ = true;
141 }
142 }
143
145 int index() const { return index_; }
146
147 std::type_index type_index() { return type_i_; }
148 void set_type_index(std::type_index type_i) { type_i_ = type_i; }
149
150 std::string name() { return thread_name_; }
151 void set_name(const std::string& name) { thread_name_ = name; }
152
153 int uid() { return uid_; }
154 void set_uid(int uid) { uid_ = uid; }
155
156 static constexpr goby::middleware::Group shutdown_group_{"goby::middleware::Thread::shutdown"};
157 static constexpr goby::middleware::Group joinable_group_{"goby::middleware::Thread::joinable"};
158
159 protected:
160 Thread(const Config& cfg, boost::units::quantity<boost::units::si::frequency> loop_freq,
161 int index = -1)
162 : loop_frequency_(loop_freq),
163 loop_time_(std::chrono::steady_clock::now()),
164 cfg_(cfg),
165 index_(index),
166 thread_id_(goby::middleware::gettid()),
167 thread_name_(std::to_string(thread_id_)),
168 uid_(-1)
169 {
170 if (loop_frequency_hertz() > 0 &&
171 loop_frequency_hertz() != std::numeric_limits<double>::infinity())
172 {
173 unsigned long long microsec_interval =
175
176 unsigned long long ticks_since_epoch =
177 std::chrono::duration_cast<std::chrono::microseconds>(loop_time_.time_since_epoch())
178 .count() /
179 microsec_interval;
180
181 loop_time_ = std::chrono::steady_clock::time_point(
182 std::chrono::microseconds((ticks_since_epoch + 1) * microsec_interval));
183 }
184 }
185
186 void set_transporter(TransporterType* transporter) { transporter_ = transporter; }
187
188 virtual void loop()
189 {
190 throw(std::runtime_error(
191 "void Thread::loop() must be overridden for non-zero loop frequencies"));
192 }
193
194 double loop_frequency_hertz() const { return loop_frequency_ / boost::units::si::hertz; }
195 decltype(loop_frequency_) loop_frequency() const { return loop_frequency_; }
196 double loop_max_frequency() const { return std::numeric_limits<double>::infinity(); }
197 void run_once();
198
199 TransporterType& transporter() const { return *transporter_; }
200
201 const Config& cfg() const { return cfg_; }
202
203 // called after alive() is true, but before run()
204 virtual void initialize() {}
205
206 // called after alive() is false
207 virtual void finalize() {}
208
210 {
211 health.set_name(thread_name_);
212#ifdef __APPLE__
213 health.set_thread_id_apple(thread_id_);
214#else
215 health.set_thread_id(thread_id_);
216#endif
217 if (uid_ >= 0)
218 health.set_uid(uid_);
219 this->health(health);
220 }
221
229
231 {
232 (*alive_) = false;
233 if (!finalize_run_)
234 {
235 finalize();
236 finalize_run_ = true;
237 }
238 }
239
240 bool alive() { return alive_ && *alive_; }
241
242 private:
243 void do_subscribe()
244 {
245 if (!transporter_)
246 {
247 throw(goby::Exception(
248 "Thread::transporter_ is null. Must set_transporter() before using"));
249 }
250
252 .innermost()
253 .template subscribe<shutdown_group_, ThreadIdentifier, MarshallingScheme::CXX_OBJECT>(
254 [this](const ThreadIdentifier ti) {
255 if (ti.all_threads ||
256 (ti.type_i == this->type_index() && ti.index == this->index()))
257 {
258 this->thread_quit();
259 }
260 });
261 }
262};
263
264} // namespace middleware
265
266template <typename Config, typename TransporterType>
269
270template <typename Config, typename TransporterType>
273
274template <typename Config, typename TransporterType>
276{
277 if (!transporter_)
278 throw(goby::Exception("Null transporter"));
279
280 if (loop_frequency_hertz() == std::numeric_limits<double>::infinity())
281 {
282 // call loop as fast as possible
283 transporter_->poll(std::chrono::seconds(0));
284 loop();
285 }
286 else if (loop_frequency_hertz() > 0)
287 {
288 int events = transporter_->poll(loop_time_);
289
290 // timeout
291 if (events == 0)
292 {
293 loop();
294 ++loop_count_;
295 loop_time_ += std::chrono::nanoseconds(
296 (unsigned long long)(1000000000ull / (loop_frequency_hertz() *
298 }
299 }
300 else
301 {
302 // don't call loop()
303 transporter_->poll();
304 }
305}
306} // namespace goby
307
308#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition thread.h:61
virtual void finalize()
Definition thread.h:207
Thread(const Config &cfg, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
Definition thread.h:160
std::string name()
Definition thread.h:150
Thread(const Config &cfg, TransporterType *transporter, double loop_freq_hertz=0, int index=-1)
Construct a thread with all possible metadata (using double to specify frequency in Hertz)
Definition thread.h:107
static constexpr goby::middleware::Group shutdown_group_
Definition thread.h:156
virtual void initialize()
Definition thread.h:204
decltype(loop_frequency_) loop_frequency() const
Definition thread.h:195
static constexpr goby::middleware::Group joinable_group_
Definition thread.h:157
Thread(const Config &cfg, TransporterType *transporter, int index)
Construct a thread with a given configuration, underlying transporter, and index (for multiple instan...
Definition thread.h:96
double loop_max_frequency() const
Definition thread.h:196
void set_transporter(TransporterType *transporter)
Definition thread.h:186
void set_type_index(std::type_index type_i)
Definition thread.h:148
TransporterType & transporter() const
Definition thread.h:199
TransporterType Transporter
Definition thread.h:88
std::type_index type_index()
Definition thread.h:147
void set_uid(int uid)
Definition thread.h:154
void set_name(const std::string &name)
Definition thread.h:151
virtual void health(goby::middleware::protobuf::ThreadHealth &health)
Called when HealthRequest is made by goby_coroner.
Definition thread.h:225
Thread(const Config &cfg, TransporterType *transporter, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
Construct a thread with all possible metadata (using boost::units to specify frequency)
Definition thread.h:119
virtual void loop()
Definition thread.h:188
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
Definition thread.h:209
const Config & cfg() const
Definition thread.h:201
double loop_frequency_hertz() const
Definition thread.h:194
void run(std::atomic< bool > &alive)
Run the thread until the boolean reference passed is set false. This call blocks, and should be run i...
Definition thread.h:131
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition common.h:44
The global namespace for the Goby project.
STL namespace.
static int warp_factor
Warp factor to speed up (or slow time) the time values returned by SteadyClock::now() and SystemClock...
Definition simulation.h:40