Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
multi_thread.h
Go to the documentation of this file.
1// Copyright 2017-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// James D. Turner <james.turner@nrl.navy.mil>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
26#define GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
27
28#include <boost/core/demangle.hpp>
29#include <boost/units/systems/si.hpp>
30
35
36#include "goby/exception.h"
43
48
49namespace goby
50{
51namespace middleware
52{
62template <int i>
64 : public Thread<boost::units::quantity<boost::units::si::frequency>, InterThreadTransporter>,
65 public coroner::Thread<TimerThread<i>>
66{
67 using ThreadBase =
69
70 friend class coroner::Thread<TimerThread<i>>;
71
72 public:
73 static constexpr goby::middleware::Group expire_group{"goby::middleware::TimerThread::timer",
74 i};
75
76 TimerThread(const boost::units::quantity<boost::units::si::frequency>& freq)
77 : ThreadBase(freq, &interthread_, freq)
78 {
79 this->subscribe_coroner();
80 }
81
82 private:
83 void loop() override { interthread_.template publish_empty<expire_group>(); }
84
85 InterThreadTransporter& interthread() { return interthread_; }
86
87 private:
88 InterThreadTransporter interthread_;
89};
90
92
97template <class Config, class Transporter>
99 public goby::middleware::Thread<Config, Transporter>
100{
101 private:
102 struct ThreadManagement
103 {
104 ThreadManagement() = default;
105 ~ThreadManagement()
106 {
107 if (thread)
108 {
110 goby::glog << "Joining thread: " << name << std::endl;
111 alive = false;
112 thread->join();
113 }
114 }
115
116 std::atomic<bool> alive{true};
117 std::string name;
118 int uid;
119 std::unique_ptr<std::thread> thread;
120 };
121
122 static std::exception_ptr thread_exception_;
123
124 std::map<std::type_index, std::map<int, ThreadManagement>> threads_;
125 int thread_uid_{0};
126 int running_thread_count_{0};
127 InterThreadTransporter interthread_;
128
129 public:
130 template <typename ThreadType> void launch_thread()
131 {
132 _launch_thread<ThreadType, Config, false, true>(-1, this->app_cfg());
133 }
134 template <typename ThreadType> void launch_thread(int index)
135 {
136 _launch_thread<ThreadType, Config, true, true>(index, this->app_cfg());
137 }
138
139 template <typename ThreadType, typename ThreadConfig>
140 void launch_thread(const ThreadConfig& cfg)
141 {
142 _launch_thread<ThreadType, ThreadConfig, false, true>(-1, cfg);
143 }
144 template <typename ThreadType, typename ThreadConfig>
145 void launch_thread(int index, const ThreadConfig& cfg)
146 {
147 _launch_thread<ThreadType, ThreadConfig, true, true>(index, cfg);
148 }
149
150 template <typename ThreadType> void launch_thread_without_cfg()
151 {
152 _launch_thread<ThreadType, Config, false, false>(-1, this->app_cfg());
153 }
154 template <typename ThreadType> void launch_thread_without_cfg(int index)
155 {
156 _launch_thread<ThreadType, Config, true, false>(index, this->app_cfg());
157 }
158
159 template <typename ThreadType> void join_thread(int index = -1)
160 {
161 // request thread self-join
162 auto type_i = std::type_index(typeid(ThreadType));
163 ThreadIdentifier ti{type_i, index};
165
166 // block until the thread has actually joined
167 if (threads_.count(type_i) && threads_[type_i].count(index))
168 {
169 auto& thread_manager = threads_[type_i][index];
170 while (thread_manager.thread) { MainThreadBase::transporter().poll(); }
171 }
172 }
173
174 template <int i>
175 void launch_timer(boost::units::quantity<boost::units::si::frequency> freq,
176 std::function<void()> on_expire)
177 {
178 launch_thread<goby::middleware::TimerThread<i>>(freq);
179 this->interthread()
180 .template subscribe_empty<goby::middleware::TimerThread<i>::expire_group>(on_expire);
181 }
182
183 template <int i> void join_timer() { join_thread<goby::middleware::TimerThread<i>>(); }
184
185 int running_thread_count() { return running_thread_count_; }
186
187 protected:
189
190 MultiThreadApplicationBase(boost::units::quantity<boost::units::si::frequency> loop_freq,
192 : goby::middleware::Application<Config>(),
193 MainThreadBase(this->app_cfg(), transporter, this->choose_loop_freq(loop_freq))
194 {
196
197 interthread_.template subscribe<MainThreadBase::joinable_group_>(
198 [this](const ThreadIdentifier& joinable)
199 { _join_thread(joinable.type_i, joinable.index); });
200 }
201
203
204 InterThreadTransporter& interthread() { return interthread_; }
205 virtual void post_finalize() override { join_all_threads(); }
206
207 std::map<std::type_index, std::map<int, ThreadManagement>>& threads() { return threads_; }
208
210 {
211 if (running_thread_count_ > 0)
212 {
214 goby::glog << "Requesting that all remaining threads shutdown cleanly..."
215 << std::endl;
216
218 ti.all_threads = true;
220
221 // allow the threads to self-join
222 while (running_thread_count_ > 0)
223 {
225 << running_thread_count_
226 << " threads." << std::endl;
227
228 MainThreadBase::transporter().template poll<goby::time::SteadyClock>(
229 std::chrono::milliseconds(100));
230 }
231
232 goby::glog.is(goby::util::logger::DEBUG1) && goby::glog << "All threads cleanly joined."
233 << std::endl;
234 }
235 }
236
237 private:
238 void run() override
239 {
240 try
241 {
243 }
244 catch (std::exception& e)
245 {
247 goby::glog << "MultiThreadApplicationBase:: uncaught exception: " << e.what()
248 << std::endl;
249 std::terminate();
250 }
251 }
252
253 template <typename ThreadType, typename ThreadConfig, bool has_index, bool has_config>
254 void _launch_thread(int index, const ThreadConfig& cfg);
255
256 void _join_thread(const std::type_index& type_i, int index);
257};
258
263template <class Config, template <class InnerTransporter> class InterProcessPortal>
266 Config, InterVehicleForwarder<InterProcessPortal<InterThreadTransporter>>>,
267 public coroner::ApplicationInterThread<MultiThreadApplication<Config, InterProcessPortal>>,
268 public terminate::Application<MultiThreadApplication<Config, InterProcessPortal>>
269{
270 private:
271 InterProcessPortal<InterThreadTransporter> interprocess_;
275
277 MultiThreadApplication<Config, InterProcessPortal>>;
278
279 friend class terminate::Application<MultiThreadApplication<Config, InterProcessPortal>>;
280 template <typename App> friend class goby::middleware::julia::ApplicationWrapper;
281
282 public:
286 MultiThreadApplication(double loop_freq_hertz = 0)
287 : MultiThreadApplication(loop_freq_hertz * boost::units::si::hertz)
288 {
289 }
290
294 MultiThreadApplication(boost::units::quantity<boost::units::si::frequency> loop_freq)
295 : Base(loop_freq, &intervehicle_),
296 interprocess_(Base::interthread(), detail::make_interprocess_config(
297 this->app_cfg().interprocess(), this->app_name())),
298 intervehicle_(interprocess_)
299 {
300 this->subscribe_terminate();
301
302 // we subscribe interthread as the HealthMonitorThread subscribes interprocess and handles aggregating all the responses
303 this->subscribe_coroner();
304
305 this->interprocess().template subscribe<goby::middleware::groups::datum_update>(
306 [this](const protobuf::DatumUpdate& datum_update)
307 {
308 this->configure_geodesy(
309 {datum_update.datum().lat_with_units(), datum_update.datum().lon_with_units()});
310 });
311
312 this->interprocess().template publish<goby::middleware::groups::configuration>(
313 this->app_cfg());
314
315 if (this->app_cfg().app().health_cfg().run_health_monitor_thread())
317 typename InterProcessPortal<InterThreadTransporter>::implementation_tag>>();
318 }
319
321
322 protected:
323 InterThreadTransporter& interthread() { return interprocess_.inner(); }
324 InterProcessPortal<InterThreadTransporter>& interprocess() { return interprocess_; }
329
331 {
332 health.set_name(this->app_name());
334 }
335
337 virtual void post_initialize() override { interprocess().ready(); };
338
339 private:
340 void preseed_hook(std::shared_ptr<protobuf::ProcessHealth>& health_response) override
341 {
342 // preseed all threads with error in case they don't respond
343 for (const auto& type_map_p : this->threads())
344 {
345 for (const auto& index_manager_p : type_map_p.second)
346 {
347 const auto& thread_manager = index_manager_p.second;
348 auto& thread_health = *health_response->mutable_main()->add_child();
349 thread_health.set_name(thread_manager.name);
350 thread_health.set_uid(thread_manager.uid);
353 }
354 }
355 }
356};
357
361template <class Config>
363 : public MultiThreadApplicationBase<Config, InterThreadTransporter>
364{
365 private:
367
368 public:
372 MultiThreadStandaloneApplication(double loop_freq_hertz = 0)
373 : MultiThreadStandaloneApplication(loop_freq_hertz * boost::units::si::hertz)
374 {
375 }
376
380 MultiThreadStandaloneApplication(boost::units::quantity<boost::units::si::frequency> loop_freq)
381 : Base(loop_freq, &Base::interthread())
382 {
383 }
385
386 protected:
387};
388
392template <class Config> class MultiThreadTest : public MultiThreadStandaloneApplication<Config>
393{
394 private:
396
397 public:
402 boost::units::quantity<boost::units::si::frequency> loop_freq = 0 * boost::units::si::hertz)
403 : Base(loop_freq)
404 {
405 }
406 virtual ~MultiThreadTest() {}
407
408 protected:
409 // so we can add on threads that publish to the outside for testing
412};
413
415template <typename Config> class StandaloneThread : public Thread<Config, InterThreadTransporter>
416{
417 public:
418 StandaloneThread(const Config& cfg, double loop_freq_hertz = 0, int index = -1)
419 : StandaloneThread(cfg, loop_freq_hertz * boost::units::si::hertz, index)
420 {
421 }
422
423 StandaloneThread(const Config& cfg,
424 boost::units::quantity<boost::units::si::frequency> loop_freq, int index = -1)
425 : Thread<Config, InterThreadTransporter>(cfg, loop_freq, index)
426 {
427 interthread_.reset(new InterThreadTransporter);
428
429 this->set_transporter(interthread_.get());
430 }
431
433
434 private:
435 std::unique_ptr<InterThreadTransporter> interthread_;
436};
437
438} // namespace middleware
439
440template <class Config, class Transporter>
441std::exception_ptr
443
444template <class Config, class Transporter>
445template <typename ThreadType, typename ThreadConfig, bool has_index, bool has_config>
447 int index, const ThreadConfig& cfg)
448{
449 std::type_index type_i = std::type_index(typeid(ThreadType));
450
451 if (threads_[type_i].count(index) && threads_[type_i][index].alive)
452 throw(Exception(std::string("Thread of type: ") + type_i.name() + " and index " +
453 std::to_string(index) + " is already launched and running."));
454
455 auto& thread_manager = threads_[type_i][index];
456 thread_manager.alive = true;
457 thread_manager.name = boost::core::demangle(typeid(ThreadType).name());
458 if (has_index)
459 thread_manager.name += "/" + std::to_string(index);
460 thread_manager.uid = thread_uid_++;
461
462 // copy configuration
463 auto thread_lambda = [this, type_i, index, cfg, &thread_manager]()
464 {
465#ifdef __APPLE__
466 // set thread name for debugging purposes
467 pthread_setname_np(thread_manager.name.c_str());
468#endif
469 try
470 {
471 std::shared_ptr<ThreadType> goby_thread(
472 detail::ThreadTypeSelector<ThreadType, ThreadConfig, has_index, has_config>::thread(
473 cfg, index));
474
475 goby_thread->set_name(thread_manager.name);
476 goby_thread->set_type_index(type_i);
477 goby_thread->set_uid(thread_manager.uid);
478 goby_thread->run(thread_manager.alive);
479 }
480 catch (...)
481 {
482 thread_exception_ = std::current_exception();
483 }
484
485 interthread_.publish<MainThreadBase::joinable_group_>(ThreadIdentifier{type_i, index});
486 };
487
488 thread_manager.thread = std::unique_ptr<std::thread>(new std::thread(thread_lambda));
489
490#ifndef __APPLE__
491 // set thread name for debugging purposes
492 pthread_setname_np(thread_manager.thread->native_handle(), thread_manager.name.c_str());
493#endif
494
495 ++running_thread_count_;
496}
497
498template <class Config, class Transporter>
500 const std::type_index& type_i, int index)
501{
502 if (!threads_.count(type_i) || !threads_[type_i].count(index))
503 throw(Exception(std::string("No thread of type: ") + type_i.name() + " and index " +
504 std::to_string(index) + " to join."));
505
506 if (threads_[type_i][index].thread)
507 {
509 goby::glog << "Joining thread: " << type_i.name() << " index " << index << std::endl;
510
511 threads_[type_i][index].alive = false;
512 threads_[type_i][index].thread->join();
513 threads_[type_i][index].thread.reset();
514 --running_thread_count_;
515
517 goby::glog << "Joined thread: " << type_i.name() << " index " << index << std::endl;
518
519 if (thread_exception_)
520 {
522 goby::glog << "Thread type: " << type_i.name() << ", index: " << index
523 << " had an uncaught exception" << std::endl;
524 std::rethrow_exception(thread_exception_);
525 }
526 }
527 else
528 {
530 goby::glog << "Already joined thread: " << type_i.name() << " index " << index
531 << std::endl;
532 }
533}
534
535} // namespace goby
536
537#endif
Base class for Goby applications. Generally you will want to use SingleThreadApplication or MultiThre...
Definition interface.h:78
void configure_geodesy(goby::util::UTMGeodesy::LatLonPoint datum)
Definition interface.h:326
boost::units::quantity< boost::units::si::frequency > choose_loop_freq(boost::units::quantity< boost::units::si::frequency > compiled_loop_freq)
Definition interface.h:139
const Config & app_cfg()
Accesses configuration object passed at launch.
Definition interface.h:120
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
A transporter for the interthread layer.
Definition interthread.h:59
Implements the forwarder concept for the intervehicle layer.
Base class for creating multiple thread applications.
void launch_timer(boost::units::quantity< boost::units::si::frequency > freq, std::function< void()> on_expire)
std::map< std::type_index, std::map< int, ThreadManagement > > & threads()
void launch_thread(const ThreadConfig &cfg)
void launch_thread(int index, const ThreadConfig &cfg)
MultiThreadApplicationBase(boost::units::quantity< boost::units::si::frequency > loop_freq, Transporter *transporter)
virtual void post_finalize() override
Called just after finalize.
Base class for building multithreaded applications for a given implementation of the InterProcessPort...
InterVehicleForwarder< InterProcessPortal< InterThreadTransporter > > & intervehicle()
virtual void post_initialize() override
Assume all required subscriptions are done in the Constructor or in initialize(). If this isn't the c...
InterProcessPortal< InterThreadTransporter > & interprocess()
virtual void health(goby::middleware::protobuf::ThreadHealth &health) override
Called when HealthRequest is made by goby_coroner.
MultiThreadApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
MultiThreadApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
InterThreadTransporter & interthread()
Base class for building multithreaded Goby applications that do not have perform any interprocess (or...
MultiThreadStandaloneApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
MultiThreadStandaloneApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
Base class for building multithreaded Goby tests that do not have perform any interprocess (or outer)...
InterThreadTransporter & intervehicle()
MultiThreadTest(boost::units::quantity< boost::units::si::frequency > loop_freq=0 *boost::units::si::hertz)
Construct the test running at the given frequency.
InterThreadTransporter & interprocess()
Class for use with MultiThreadStandaloneApplication (interthread only)
StandaloneThread(const Config &cfg, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
InterThreadTransporter & interthread()
StandaloneThread(const Config &cfg, double loop_freq_hertz=0, int index=-1)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:245
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition thread.h:61
static constexpr goby::middleware::Group shutdown_group_
Definition thread.h:185
void set_transporter(InterThreadTransporter *transporter)
Definition thread.h:202
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
Definition thread.h:223
Thread that simply publishes an empty message on its loop interval to TimerThread::group.
TimerThread(const boost::units::quantity< boost::units::si::frequency > &freq)
static constexpr goby::middleware::Group expire_group
const ::goby::middleware::protobuf::LatLonPoint & datum() const
void set_state(::goby::middleware::protobuf::HealthState value)
bool is(goby::util::logger::Verbosity verbosity)
void set_lock_action(logger_lock::LockAction lock_action)
detail namespace with internal helper functions
Definition json.hpp:247
InterProcessPortalImplementation< InnerTransporter, middleware::InterProcessPortalBase, detail::InterProcessTag > InterProcessPortal
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.