Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
multi_thread.h
Go to the documentation of this file.
1// Copyright 2017-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// James D. Turner <james.turner@nrl.navy.mil>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
26#define GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
27
28#include <boost/core/demangle.hpp>
29#include <boost/units/systems/si.hpp>
30
35
36#include "goby/exception.h"
43
47
48namespace goby
49{
50namespace middleware
51{
61template <int i>
63 : public Thread<boost::units::quantity<boost::units::si::frequency>, InterThreadTransporter>,
64 public coroner::Thread<TimerThread<i>>
65{
66 using ThreadBase =
68
69 friend class coroner::Thread<TimerThread<i>>;
70
71 public:
72 static constexpr goby::middleware::Group expire_group{"goby::middleware::TimerThread::timer",
73 i};
74
75 TimerThread(const boost::units::quantity<boost::units::si::frequency>& freq)
76 : ThreadBase(freq, &interthread_, freq)
77 {
78 this->subscribe_coroner();
79 }
80
81 private:
82 void loop() override { interthread_.template publish_empty<expire_group>(); }
83
84 InterThreadTransporter& interthread() { return interthread_; }
85
86 private:
87 InterThreadTransporter interthread_;
88};
89
91
96template <class Config, class Transporter>
98 public goby::middleware::Thread<Config, Transporter>
99{
100 private:
101 struct ThreadManagement
102 {
103 ThreadManagement() = default;
104 ~ThreadManagement()
105 {
106 if (thread)
107 {
109 goby::glog << "Joining thread: " << name << std::endl;
110 alive = false;
111 thread->join();
112 }
113 }
114
115 std::atomic<bool> alive{true};
116 std::string name;
117 int uid;
118 std::unique_ptr<std::thread> thread;
119 };
120
121 static std::exception_ptr thread_exception_;
122
123 std::map<std::type_index, std::map<int, ThreadManagement>> threads_;
124 int thread_uid_{0};
125 int running_thread_count_{0};
126 InterThreadTransporter interthread_;
127
128 public:
129 template <typename ThreadType> void launch_thread()
130 {
131 _launch_thread<ThreadType, Config, false, true>(-1, this->app_cfg());
132 }
133 template <typename ThreadType> void launch_thread(int index)
134 {
135 _launch_thread<ThreadType, Config, true, true>(index, this->app_cfg());
136 }
137
138 template <typename ThreadType, typename ThreadConfig>
139 void launch_thread(const ThreadConfig& cfg)
140 {
141 _launch_thread<ThreadType, ThreadConfig, false, true>(-1, cfg);
142 }
143 template <typename ThreadType, typename ThreadConfig>
144 void launch_thread(int index, const ThreadConfig& cfg)
145 {
146 _launch_thread<ThreadType, ThreadConfig, true, true>(index, cfg);
147 }
148
149 template <typename ThreadType> void launch_thread_without_cfg()
150 {
151 _launch_thread<ThreadType, Config, false, false>(-1, this->app_cfg());
152 }
153 template <typename ThreadType> void launch_thread_without_cfg(int index)
154 {
155 _launch_thread<ThreadType, Config, true, false>(index, this->app_cfg());
156 }
157
158 template <typename ThreadType> void join_thread(int index = -1)
159 {
160 // request thread self-join
161 auto type_i = std::type_index(typeid(ThreadType));
162 ThreadIdentifier ti{type_i, index};
164 }
165
166 template <int i>
167 void launch_timer(boost::units::quantity<boost::units::si::frequency> freq,
168 std::function<void()> on_expire)
169 {
170 launch_thread<goby::middleware::TimerThread<i>>(freq);
171 this->interthread()
172 .template subscribe_empty<goby::middleware::TimerThread<i>::expire_group>(on_expire);
173 }
174
175 template <int i> void join_timer() { join_thread<goby::middleware::TimerThread<i>>(); }
176
177 int running_thread_count() { return running_thread_count_; }
178
179 protected:
181
182 MultiThreadApplicationBase(boost::units::quantity<boost::units::si::frequency> loop_freq,
184 : goby::middleware::Application<Config>(),
185 MainThreadBase(this->app_cfg(), transporter, loop_freq)
186 {
188
189 interthread_.template subscribe<MainThreadBase::joinable_group_>(
190 [this](const ThreadIdentifier& joinable)
191 { _join_thread(joinable.type_i, joinable.index); });
192 }
193
195
196 InterThreadTransporter& interthread() { return interthread_; }
197 virtual void post_finalize() override { join_all_threads(); }
198
199 std::map<std::type_index, std::map<int, ThreadManagement>>& threads() { return threads_; }
200
202 {
203 if (running_thread_count_ > 0)
204 {
206 goby::glog << "Requesting that all remaining threads shutdown cleanly..."
207 << std::endl;
208
210 ti.all_threads = true;
212
213 // allow the threads to self-join
214 while (running_thread_count_ > 0)
215 {
217 << running_thread_count_
218 << " threads." << std::endl;
219
221 }
222
223 goby::glog.is(goby::util::logger::DEBUG1) && goby::glog << "All threads cleanly joined."
224 << std::endl;
225 }
226 }
227
228 private:
229 void run() override
230 {
231 try
232 {
234 }
235 catch (std::exception& e)
236 {
238 goby::glog << "MultiThreadApplicationBase:: uncaught exception: " << e.what()
239 << std::endl;
240 std::terminate();
241 }
242 }
243
244 template <typename ThreadType, typename ThreadConfig, bool has_index, bool has_config>
245 void _launch_thread(int index, const ThreadConfig& cfg);
246
247 void _join_thread(const std::type_index& type_i, int index);
248};
249
254template <class Config, template <class InnerTransporter> class InterProcessPortal>
257 Config, InterVehicleForwarder<InterProcessPortal<InterThreadTransporter>>>,
258 public coroner::ApplicationInterThread<MultiThreadApplication<Config, InterProcessPortal>>,
259 public terminate::Application<MultiThreadApplication<Config, InterProcessPortal>>
260{
261 private:
262 InterProcessPortal<InterThreadTransporter> interprocess_;
266
268 MultiThreadApplication<Config, InterProcessPortal>>;
269
270 friend class terminate::Application<MultiThreadApplication<Config, InterProcessPortal>>;
271
272 public:
276 MultiThreadApplication(double loop_freq_hertz = 0)
277 : MultiThreadApplication(loop_freq_hertz * boost::units::si::hertz)
278 {
279 }
280
284 MultiThreadApplication(boost::units::quantity<boost::units::si::frequency> loop_freq)
285 : Base(loop_freq, &intervehicle_),
286 interprocess_(Base::interthread(), detail::make_interprocess_config(
287 this->app_cfg().interprocess(), this->app_name())),
288 intervehicle_(interprocess_)
289 {
290 this->subscribe_terminate();
291
292 // we subscribe interthread as the HealthMonitorThread subscribes interprocess and handles aggregating all the responses
293 this->subscribe_coroner();
294
295 this->interprocess().template subscribe<goby::middleware::groups::datum_update>(
296 [this](const protobuf::DatumUpdate& datum_update)
297 {
298 this->configure_geodesy(
299 {datum_update.datum().lat_with_units(), datum_update.datum().lon_with_units()});
300 });
301
302 this->interprocess().template publish<goby::middleware::groups::configuration>(
303 this->app_cfg());
304
305 if (this->app_cfg().app().health_cfg().run_health_monitor_thread())
306 this->template launch_thread_without_cfg<HealthMonitorThread>();
307 }
308
310
311 protected:
312 InterThreadTransporter& interthread() { return interprocess_.inner(); }
313 InterProcessPortal<InterThreadTransporter>& interprocess() { return interprocess_; }
318
320 {
321 health.set_name(this->app_name());
323 }
324
326 virtual void post_initialize() override { interprocess().ready(); };
327
328 private:
329 void preseed_hook(std::shared_ptr<protobuf::ProcessHealth>& health_response) override
330 {
331 // preseed all threads with error in case they don't respond
332 for (const auto& type_map_p : this->threads())
333 {
334 for (const auto& index_manager_p : type_map_p.second)
335 {
336 const auto& thread_manager = index_manager_p.second;
337 auto& thread_health = *health_response->mutable_main()->add_child();
338 thread_health.set_name(thread_manager.name);
339 thread_health.set_uid(thread_manager.uid);
342 }
343 }
344 }
345};
346
350template <class Config>
352 : public MultiThreadApplicationBase<Config, InterThreadTransporter>
353{
354 private:
356
357 public:
361 MultiThreadStandaloneApplication(double loop_freq_hertz = 0)
362 : MultiThreadStandaloneApplication(loop_freq_hertz * boost::units::si::hertz)
363 {
364 }
365
369 MultiThreadStandaloneApplication(boost::units::quantity<boost::units::si::frequency> loop_freq)
370 : Base(loop_freq, &Base::interthread())
371 {
372 }
374
375 protected:
376};
377
381template <class Config> class MultiThreadTest : public MultiThreadStandaloneApplication<Config>
382{
383 private:
385
386 public:
391 boost::units::quantity<boost::units::si::frequency> loop_freq = 0 * boost::units::si::hertz)
392 : Base(loop_freq)
393 {
394 }
395 virtual ~MultiThreadTest() {}
396
397 protected:
398 // so we can add on threads that publish to the outside for testing
401};
402
403} // namespace middleware
404
405template <class Config, class Transporter>
406std::exception_ptr
408
409template <class Config, class Transporter>
410template <typename ThreadType, typename ThreadConfig, bool has_index, bool has_config>
412 int index, const ThreadConfig& cfg)
413{
414 std::type_index type_i = std::type_index(typeid(ThreadType));
415
416 if (threads_[type_i].count(index) && threads_[type_i][index].alive)
417 throw(Exception(std::string("Thread of type: ") + type_i.name() + " and index " +
418 std::to_string(index) + " is already launched and running."));
419
420 auto& thread_manager = threads_[type_i][index];
421 thread_manager.alive = true;
422 thread_manager.name = boost::core::demangle(typeid(ThreadType).name());
423 if (has_index)
424 thread_manager.name += "/" + std::to_string(index);
425 thread_manager.uid = thread_uid_++;
426
427 // copy configuration
428 auto thread_lambda = [this, type_i, index, cfg, &thread_manager]()
429 {
430#ifdef __APPLE__
431 // set thread name for debugging purposes
432 pthread_setname_np(thread_manager.name.c_str());
433#endif
434 try
435 {
436 std::shared_ptr<ThreadType> goby_thread(
437 detail::ThreadTypeSelector<ThreadType, ThreadConfig, has_index, has_config>::thread(
438 cfg, index));
439
440 goby_thread->set_name(thread_manager.name);
441 goby_thread->set_type_index(type_i);
442 goby_thread->set_uid(thread_manager.uid);
443 goby_thread->run(thread_manager.alive);
444 }
445 catch (...)
446 {
447 thread_exception_ = std::current_exception();
448 }
449
450 interthread_.publish<MainThreadBase::joinable_group_>(ThreadIdentifier{type_i, index});
451 };
452
453 thread_manager.thread = std::unique_ptr<std::thread>(new std::thread(thread_lambda));
454
455#ifndef __APPLE__
456 // set thread name for debugging purposes
457 pthread_setname_np(thread_manager.thread->native_handle(), thread_manager.name.c_str());
458#endif
459
460 ++running_thread_count_;
461}
462
463template <class Config, class Transporter>
465 const std::type_index& type_i, int index)
466{
467 if (!threads_.count(type_i) || !threads_[type_i].count(index))
468 throw(Exception(std::string("No thread of type: ") + type_i.name() + " and index " +
469 std::to_string(index) + " to join."));
470
471 if (threads_[type_i][index].thread)
472 {
474 goby::glog << "Joining thread: " << type_i.name() << " index " << index << std::endl;
475
476 threads_[type_i][index].alive = false;
477 threads_[type_i][index].thread->join();
478 threads_[type_i][index].thread.reset();
479 --running_thread_count_;
480
482 goby::glog << "Joined thread: " << type_i.name() << " index " << index << std::endl;
483
484 if (thread_exception_)
485 {
487 goby::glog << "Thread type: " << type_i.name() << ", index: " << index
488 << " had an uncaught exception" << std::endl;
489 std::rethrow_exception(thread_exception_);
490 }
491 }
492 else
493 {
495 goby::glog << "Already joined thread: " << type_i.name() << " index " << index
496 << std::endl;
497 }
498}
499
500} // namespace goby
501
502#endif
Base class for Goby applications. Generally you will want to use SingleThreadApplication or MultiThre...
Definition interface.h:72
void configure_geodesy(goby::util::UTMGeodesy::LatLonPoint datum)
Definition interface.h:288
const Config & app_cfg()
Accesses configuration object passed at launch.
Definition interface.h:114
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
A transporter for the interthread layer.
Definition interthread.h:59
Implements the forwarder concept for the intervehicle layer.
Base class for creating multiple thread applications.
void launch_timer(boost::units::quantity< boost::units::si::frequency > freq, std::function< void()> on_expire)
std::map< std::type_index, std::map< int, ThreadManagement > > & threads()
void launch_thread(const ThreadConfig &cfg)
void launch_thread(int index, const ThreadConfig &cfg)
MultiThreadApplicationBase(boost::units::quantity< boost::units::si::frequency > loop_freq, Transporter *transporter)
virtual void post_finalize() override
Called just after finalize.
Base class for building multithreaded applications for a given implementation of the InterProcessPort...
InterVehicleForwarder< InterProcessPortal< InterThreadTransporter > > & intervehicle()
virtual void post_initialize() override
Assume all required subscriptions are done in the Constructor or in initialize(). If this isn't the c...
InterProcessPortal< InterThreadTransporter > & interprocess()
virtual void health(goby::middleware::protobuf::ThreadHealth &health) override
Called when HealthRequest is made by goby_coroner.
MultiThreadApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
MultiThreadApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
InterThreadTransporter & interthread()
Base class for building multithreaded Goby applications that do not have perform any interprocess (or...
MultiThreadStandaloneApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
MultiThreadStandaloneApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
Base class for building multithreaded Goby tests that do not have perform any interprocess (or outer)...
InterThreadTransporter & intervehicle()
MultiThreadTest(boost::units::quantity< boost::units::si::frequency > loop_freq=0 *boost::units::si::hertz)
Construct the test running at the given frequency.
InterThreadTransporter & interprocess()
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:215
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition thread.h:61
static constexpr goby::middleware::Group shutdown_group_
Definition thread.h:156
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
Definition thread.h:209
Thread that simply publishes an empty message on its loop interval to TimerThread::group.
TimerThread(const boost::units::quantity< boost::units::si::frequency > &freq)
static constexpr goby::middleware::Group expire_group
const ::goby::middleware::protobuf::LatLonPoint & datum() const
void set_state(::goby::middleware::protobuf::HealthState value)
bool is(goby::util::logger::Verbosity verbosity)
void set_lock_action(logger_lock::LockAction lock_action)
detail namespace with internal helper functions
Definition json.hpp:247
InterProcessPortalImplementation< InnerTransporter, middleware::InterProcessPortalBase > InterProcessPortal
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.