Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
multi_thread.h
Go to the documentation of this file.
1// Copyright 2017-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7// James D. Turner <james.turner@nrl.navy.mil>
8//
9//
10// This file is part of the Goby Underwater Autonomy Project Libraries
11// ("The Goby Libraries").
12//
13// The Goby Libraries are free software: you can redistribute them and/or modify
14// them under the terms of the GNU Lesser General Public License as published by
15// the Free Software Foundation, either version 2.1 of the License, or
16// (at your option) any later version.
17//
18// The Goby Libraries are distributed in the hope that they will be useful,
19// but WITHOUT ANY WARRANTY; without even the implied warranty of
20// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21// GNU Lesser General Public License for more details.
22//
23// You should have received a copy of the GNU Lesser General Public License
24// along with Goby. If not, see <http://www.gnu.org/licenses/>.
25
26#ifndef GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
27#define GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
28
29#include <boost/core/demangle.hpp>
30#include <boost/units/systems/si.hpp>
31
36
37#include "goby/exception.h"
44
49
50namespace goby
51{
52namespace middleware
53{
63template <int i>
65 : public Thread<boost::units::quantity<boost::units::si::frequency>, InterThreadTransporter>,
66 public coroner::Thread<TimerThread<i>>
67{
68 using ThreadBase =
70
71 friend class coroner::Thread<TimerThread<i>>;
72
73 public:
74 static constexpr goby::middleware::Group expire_group{"goby::middleware::TimerThread::timer",
75 i};
76
77 TimerThread(const boost::units::quantity<boost::units::si::frequency>& freq)
78 : ThreadBase(freq, &interthread_, freq)
79 {
80 this->subscribe_coroner();
81 }
82
83 private:
84 void loop() override { interthread_.template publish_empty<expire_group>(); }
85
86 InterThreadTransporter& interthread() { return interthread_; }
87
88 private:
89 InterThreadTransporter interthread_;
90};
91
93
98template <class Config, class Transporter>
100 public goby::middleware::Thread<Config, Transporter>
101{
102 private:
103 struct ThreadManagement
104 {
105 ThreadManagement() = default;
106 ~ThreadManagement()
107 {
108 if (thread)
109 {
111 goby::glog << "Joining thread: " << name << std::endl;
112 alive = false;
113 thread->join();
114 }
115 }
116
117 std::atomic<bool> alive{true};
118 std::string name;
119 int uid;
120 std::unique_ptr<std::thread> thread;
121 };
122
123 static std::exception_ptr thread_exception_;
124
125 std::map<std::type_index, std::map<int, ThreadManagement>> threads_;
126 int thread_uid_{0};
127 int running_thread_count_{0};
128 InterThreadTransporter interthread_;
129
130 public:
131 template <typename ThreadType> void launch_thread()
132 {
133 _launch_thread<ThreadType, Config, false, true>(-1, this->app_cfg());
134 }
135 template <typename ThreadType> void launch_thread(int index)
136 {
137 _launch_thread<ThreadType, Config, true, true>(index, this->app_cfg());
138 }
139
140 template <typename ThreadType, typename ThreadConfig>
141 void launch_thread(const ThreadConfig& cfg)
142 {
143 _launch_thread<ThreadType, ThreadConfig, false, true>(-1, cfg);
144 }
145 template <typename ThreadType, typename ThreadConfig>
146 void launch_thread(int index, const ThreadConfig& cfg)
147 {
148 _launch_thread<ThreadType, ThreadConfig, true, true>(index, cfg);
149 }
150
151 template <typename ThreadType> void launch_thread_without_cfg()
152 {
153 _launch_thread<ThreadType, Config, false, false>(-1, this->app_cfg());
154 }
155 template <typename ThreadType> void launch_thread_without_cfg(int index)
156 {
157 _launch_thread<ThreadType, Config, true, false>(index, this->app_cfg());
158 }
159
160 template <typename ThreadType> void join_thread(int index = -1)
161 {
162 // request thread self-join
163 auto type_i = std::type_index(typeid(ThreadType));
164 ThreadIdentifier ti{type_i, index};
166
167 // block until the thread has actually joined
168 if (threads_.count(type_i) && threads_[type_i].count(index))
169 {
170 auto& thread_manager = threads_[type_i][index];
171 while (thread_manager.thread) { MainThreadBase::transporter().poll(); }
172 }
173 }
174
175 template <int i>
176 void launch_timer(boost::units::quantity<boost::units::si::frequency> freq,
177 std::function<void()> on_expire)
178 {
179 launch_thread<goby::middleware::TimerThread<i>>(freq);
180 this->interthread()
181 .template subscribe_empty<goby::middleware::TimerThread<i>::expire_group>(on_expire);
182 }
183
184 template <int i> void join_timer() { join_thread<goby::middleware::TimerThread<i>>(); }
185
186 int running_thread_count() { return running_thread_count_; }
187
188 protected:
190
191 MultiThreadApplicationBase(boost::units::quantity<boost::units::si::frequency> loop_freq,
193 : goby::middleware::Application<Config>(),
194 MainThreadBase(this->app_cfg(), transporter, this->choose_loop_freq(loop_freq))
195 {
197
198 interthread_.template subscribe<MainThreadBase::joinable_group_>(
199 [this](const ThreadIdentifier& joinable)
200 { _join_thread(joinable.type_i, joinable.index); });
201 }
202
204
205 InterThreadTransporter& interthread() { return interthread_; }
206 virtual void post_finalize() override { join_all_threads(); }
207
208 std::map<std::type_index, std::map<int, ThreadManagement>>& threads() { return threads_; }
209
211 {
212 if (running_thread_count_ > 0)
213 {
215 goby::glog << "Requesting that all remaining threads shutdown cleanly..."
216 << std::endl;
217
219 ti.all_threads = true;
221
222 // allow the threads to self-join
223 while (running_thread_count_ > 0)
224 {
226 << running_thread_count_
227 << " threads." << std::endl;
228
229 MainThreadBase::transporter().template poll<goby::time::SteadyClock>(
230 std::chrono::milliseconds(100));
231 }
232
233 goby::glog.is(goby::util::logger::DEBUG1) && goby::glog << "All threads cleanly joined."
234 << std::endl;
235 }
236 }
237
238 private:
239 void run() override
240 {
241 try
242 {
244 }
245 catch (std::exception& e)
246 {
248 goby::glog << "MultiThreadApplicationBase:: uncaught exception: " << e.what()
249 << std::endl;
250 std::terminate();
251 }
252 }
253
254 template <typename ThreadType, typename ThreadConfig, bool has_index, bool has_config>
255 void _launch_thread(int index, const ThreadConfig& cfg);
256
257 void _join_thread(const std::type_index& type_i, int index);
258};
259
264template <class Config, template <class InnerTransporter> class InterProcessPortal>
267 Config, InterVehicleForwarder<InterProcessPortal<InterThreadTransporter>>>,
268 public coroner::ApplicationInterThread<MultiThreadApplication<Config, InterProcessPortal>>,
269 public terminate::Application<MultiThreadApplication<Config, InterProcessPortal>>
270{
271 private:
272 InterProcessPortal<InterThreadTransporter> interprocess_;
276
278 MultiThreadApplication<Config, InterProcessPortal>>;
279
280 friend class terminate::Application<MultiThreadApplication<Config, InterProcessPortal>>;
281 template <typename App> friend class goby::middleware::julia::ApplicationWrapper;
282
283 public:
287 MultiThreadApplication(double loop_freq_hertz = 0)
288 : MultiThreadApplication(loop_freq_hertz * boost::units::si::hertz)
289 {
290 }
291
295 MultiThreadApplication(boost::units::quantity<boost::units::si::frequency> loop_freq)
296 : Base(loop_freq, &intervehicle_),
297 interprocess_(Base::interthread(), detail::make_interprocess_config(
298 this->app_cfg().interprocess(), this->app_name())),
299 intervehicle_(interprocess_)
300 {
301 this->subscribe_terminate();
302
303 // we subscribe interthread as the HealthMonitorThread subscribes interprocess and handles aggregating all the responses
304 this->subscribe_coroner();
305
306 this->interprocess().template subscribe<goby::middleware::groups::datum_update>(
307 [this](const protobuf::DatumUpdate& datum_update)
308 {
309 this->configure_geodesy(
310 {datum_update.datum().lat_with_units(), datum_update.datum().lon_with_units()});
311 });
312
313 this->interprocess().template publish<goby::middleware::groups::configuration>(
314 this->app_cfg());
315
316 if (this->app_cfg().app().health_cfg().run_health_monitor_thread())
318 typename InterProcessPortal<InterThreadTransporter>::implementation_tag>>();
319 }
320
322
323 protected:
324 InterThreadTransporter& interthread() { return interprocess_.inner(); }
325 InterProcessPortal<InterThreadTransporter>& interprocess() { return interprocess_; }
330
332 {
333 health.set_name(this->app_name());
335 }
336
338 virtual void post_initialize() override { interprocess().ready(); };
339
340 private:
341 void preseed_hook(std::shared_ptr<protobuf::ProcessHealth>& health_response) override
342 {
343 // preseed all threads with error in case they don't respond
344 for (const auto& type_map_p : this->threads())
345 {
346 for (const auto& index_manager_p : type_map_p.second)
347 {
348 const auto& thread_manager = index_manager_p.second;
349 auto& thread_health = *health_response->mutable_main()->add_child();
350 thread_health.set_name(thread_manager.name);
351 thread_health.set_uid(thread_manager.uid);
354 }
355 }
356 }
357};
358
362template <class Config>
364 : public MultiThreadApplicationBase<Config, InterThreadTransporter>
365{
366 private:
368
369 public:
373 MultiThreadStandaloneApplication(double loop_freq_hertz = 0)
374 : MultiThreadStandaloneApplication(loop_freq_hertz * boost::units::si::hertz)
375 {
376 }
377
381 MultiThreadStandaloneApplication(boost::units::quantity<boost::units::si::frequency> loop_freq)
382 : Base(loop_freq, &Base::interthread())
383 {
384 }
386
387 protected:
388};
389
393template <class Config> class MultiThreadTest : public MultiThreadStandaloneApplication<Config>
394{
395 private:
397
398 public:
403 boost::units::quantity<boost::units::si::frequency> loop_freq = 0 * boost::units::si::hertz)
404 : Base(loop_freq)
405 {
406 }
407 virtual ~MultiThreadTest() {}
408
409 protected:
410 // so we can add on threads that publish to the outside for testing
413};
414
416template <typename Config> class StandaloneThread : public Thread<Config, InterThreadTransporter>
417{
418 public:
419 StandaloneThread(const Config& cfg, double loop_freq_hertz = 0, int index = -1)
420 : StandaloneThread(cfg, loop_freq_hertz * boost::units::si::hertz, index)
421 {
422 }
423
424 StandaloneThread(const Config& cfg,
425 boost::units::quantity<boost::units::si::frequency> loop_freq, int index = -1)
426 : Thread<Config, InterThreadTransporter>(cfg, loop_freq, index)
427 {
428 interthread_.reset(new InterThreadTransporter);
429
430 this->set_transporter(interthread_.get());
431 }
432
434
435 private:
436 std::unique_ptr<InterThreadTransporter> interthread_;
437};
438
439} // namespace middleware
440
441template <class Config, class Transporter>
442std::exception_ptr
444
445template <class Config, class Transporter>
446template <typename ThreadType, typename ThreadConfig, bool has_index, bool has_config>
448 int index, const ThreadConfig& cfg)
449{
450 std::type_index type_i = std::type_index(typeid(ThreadType));
451
452 if (threads_[type_i].count(index) && threads_[type_i][index].alive)
453 throw(Exception(std::string("Thread of type: ") + type_i.name() + " and index " +
454 std::to_string(index) + " is already launched and running."));
455
456 auto& thread_manager = threads_[type_i][index];
457 thread_manager.alive = true;
458 thread_manager.name = boost::core::demangle(typeid(ThreadType).name());
459 if (has_index)
460 thread_manager.name += "/" + std::to_string(index);
461 thread_manager.uid = thread_uid_++;
462
463 // copy configuration
464 auto thread_lambda = [this, type_i, index, cfg, &thread_manager]()
465 {
466#ifdef __APPLE__
467 // set thread name for debugging purposes
468 pthread_setname_np(thread_manager.name.c_str());
469#endif
470 try
471 {
472 std::shared_ptr<ThreadType> goby_thread(
473 detail::ThreadTypeSelector<ThreadType, ThreadConfig, has_index, has_config>::thread(
474 cfg, index));
475
476 goby_thread->set_name(thread_manager.name);
477 goby_thread->set_type_index(type_i);
478 goby_thread->set_uid(thread_manager.uid);
479 goby_thread->run(thread_manager.alive);
480 }
481 catch (...)
482 {
483 thread_exception_ = std::current_exception();
484 }
485
486 interthread_.publish<MainThreadBase::joinable_group_>(ThreadIdentifier{type_i, index});
487 };
488
489 thread_manager.thread = std::unique_ptr<std::thread>(new std::thread(thread_lambda));
490
491#ifndef __APPLE__
492 // set thread name for debugging purposes
493 pthread_setname_np(thread_manager.thread->native_handle(), thread_manager.name.c_str());
494#endif
495
496 ++running_thread_count_;
497}
498
499template <class Config, class Transporter>
501 const std::type_index& type_i, int index)
502{
503 if (!threads_.count(type_i) || !threads_[type_i].count(index))
504 throw(Exception(std::string("No thread of type: ") + type_i.name() + " and index " +
505 std::to_string(index) + " to join."));
506
507 if (threads_[type_i][index].thread)
508 {
510 goby::glog << "Joining thread: " << type_i.name() << " index " << index << std::endl;
511
512 threads_[type_i][index].alive = false;
513 threads_[type_i][index].thread->join();
514 threads_[type_i][index].thread.reset();
515 --running_thread_count_;
516
518 goby::glog << "Joined thread: " << type_i.name() << " index " << index << std::endl;
519
520 if (thread_exception_)
521 {
523 goby::glog << "Thread type: " << type_i.name() << ", index: " << index
524 << " had an uncaught exception" << std::endl;
525 std::rethrow_exception(thread_exception_);
526 }
527 }
528 else
529 {
531 goby::glog << "Already joined thread: " << type_i.name() << " index " << index
532 << std::endl;
533 }
534}
535
536} // namespace goby
537
538#endif
Base class for Goby applications. Generally you will want to use SingleThreadApplication or MultiThre...
Definition interface.h:80
void configure_geodesy(goby::util::UTMGeodesy::LatLonPoint datum)
Definition interface.h:333
boost::units::quantity< boost::units::si::frequency > choose_loop_freq(boost::units::quantity< boost::units::si::frequency > compiled_loop_freq)
Definition interface.h:141
const Config & app_cfg()
Accesses configuration object passed at launch.
Definition interface.h:122
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
A transporter for the interthread layer.
Definition interthread.h:60
Implements the forwarder concept for the intervehicle layer.
Base class for creating multiple thread applications.
void launch_timer(boost::units::quantity< boost::units::si::frequency > freq, std::function< void()> on_expire)
std::map< std::type_index, std::map< int, ThreadManagement > > & threads()
void launch_thread(const ThreadConfig &cfg)
void launch_thread(int index, const ThreadConfig &cfg)
MultiThreadApplicationBase(boost::units::quantity< boost::units::si::frequency > loop_freq, Transporter *transporter)
virtual void post_finalize() override
Called just after finalize.
Base class for building multithreaded applications for a given implementation of the InterProcessPort...
InterVehicleForwarder< InterProcessPortal< InterThreadTransporter > > & intervehicle()
virtual void post_initialize() override
Assume all required subscriptions are done in the Constructor or in initialize(). If this isn't the c...
InterProcessPortal< InterThreadTransporter > & interprocess()
virtual void health(goby::middleware::protobuf::ThreadHealth &health) override
Called when HealthRequest is made by goby_coroner.
MultiThreadApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
MultiThreadApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
InterThreadTransporter & interthread()
Base class for building multithreaded Goby applications that do not have perform any interprocess (or...
MultiThreadStandaloneApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
MultiThreadStandaloneApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
Base class for building multithreaded Goby tests that do not have perform any interprocess (or outer)...
InterThreadTransporter & intervehicle()
MultiThreadTest(boost::units::quantity< boost::units::si::frequency > loop_freq=0 *boost::units::si::hertz)
Construct the test running at the given frequency.
InterThreadTransporter & interprocess()
Class for use with MultiThreadStandaloneApplication (interthread only)
StandaloneThread(const Config &cfg, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
InterThreadTransporter & interthread()
StandaloneThread(const Config &cfg, double loop_freq_hertz=0, int index=-1)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition interface.h:246
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition thread.h:61
static constexpr goby::middleware::Group shutdown_group_
Definition thread.h:185
void set_transporter(InterThreadTransporter *transporter)
Definition thread.h:202
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
Definition thread.h:223
Thread that simply publishes an empty message on its loop interval to TimerThread::group.
TimerThread(const boost::units::quantity< boost::units::si::frequency > &freq)
static constexpr goby::middleware::Group expire_group
const ::goby::middleware::protobuf::LatLonPoint & datum() const
void set_state(::goby::middleware::protobuf::HealthState value)
bool is(goby::util::logger::Verbosity verbosity)
void set_lock_action(logger_lock::LockAction lock_action)
detail namespace with internal helper functions
Definition json.hpp:263
InterProcessPortalImplementation< InnerTransporter, middleware::InterProcessPortalBase, detail::InterProcessTag > InterProcessPortal
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
uint32_t index(const std::array< int8_t, 256 > &rdata, char symbol)
Definition base.h:140