25#ifndef GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
26#define GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
28#include <boost/core/demangle.hpp>
29#include <boost/units/systems/si.hpp>
64 :
public Thread<boost::units::quantity<boost::units::si::frequency>, InterThreadTransporter>,
76 TimerThread(
const boost::units::quantity<boost::units::si::frequency>& freq)
83 void loop()
override { interthread_.template publish_empty<expire_group>(); }
85 InterThreadTransporter& interthread() {
return interthread_; }
88 InterThreadTransporter interthread_;
97template <
class Config,
class Transporter>
102 struct ThreadManagement
104 ThreadManagement() =
default;
110 goby::glog <<
"Joining thread: " << name << std::endl;
116 std::atomic<bool> alive{
true};
119 std::unique_ptr<std::thread> thread;
122 static std::exception_ptr thread_exception_;
124 std::map<std::type_index, std::map<int, ThreadManagement>> threads_;
126 int running_thread_count_{0};
132 _launch_thread<ThreadType, Config, false, true>(-1, this->
app_cfg());
136 _launch_thread<ThreadType, Config, true, true>(
index, this->
app_cfg());
139 template <
typename ThreadType,
typename ThreadConfig>
142 _launch_thread<ThreadType, ThreadConfig, false, true>(-1,
cfg);
144 template <
typename ThreadType,
typename ThreadConfig>
147 _launch_thread<ThreadType, ThreadConfig, true, true>(
index,
cfg);
152 _launch_thread<ThreadType, Config, false, false>(-1, this->
app_cfg());
156 _launch_thread<ThreadType, Config, true, false>(
index, this->
app_cfg());
162 auto type_i = std::type_index(
typeid(ThreadType));
167 if (threads_.count(type_i) && threads_[type_i].count(
index))
169 auto& thread_manager = threads_[type_i][
index];
175 void launch_timer(boost::units::quantity<boost::units::si::frequency> freq,
176 std::function<
void()> on_expire)
178 launch_thread<goby::middleware::TimerThread<i>>(freq);
180 .template subscribe_empty<goby::middleware::TimerThread<i>::expire_group>(on_expire);
183 template <
int i>
void join_timer() { join_thread<goby::middleware::TimerThread<i>>(); }
197 interthread_.template subscribe<MainThreadBase::joinable_group_>(
199 { _join_thread(joinable.
type_i, joinable.
index); });
207 std::map<std::type_index, std::map<int, ThreadManagement>>&
threads() {
return threads_; }
211 if (running_thread_count_ > 0)
214 goby::glog <<
"Requesting that all remaining threads shutdown cleanly..."
222 while (running_thread_count_ > 0)
225 << running_thread_count_
226 <<
" threads." << std::endl;
229 std::chrono::milliseconds(100));
244 catch (std::exception& e)
247 goby::glog <<
"MultiThreadApplicationBase:: uncaught exception: " << e.what()
253 template <
typename ThreadType,
typename ThreadConfig,
bool has_index,
bool has_config>
254 void _launch_thread(
int index,
const ThreadConfig&
cfg);
256 void _join_thread(
const std::type_index& type_i,
int index);
266 Config, InterVehicleForwarder<InterProcessPortal<InterThreadTransporter>>>,
271 InterProcessPortal<InterThreadTransporter> interprocess_;
295 :
Base(loop_freq, &intervehicle_),
298 intervehicle_(interprocess_)
305 this->
interprocess().template subscribe<goby::middleware::groups::datum_update>(
309 {datum_update.
datum().lat_with_units(), datum_update.datum().lon_with_units()});
312 this->
interprocess().template publish<goby::middleware::groups::configuration>(
315 if (this->
app_cfg().app().health_cfg().run_health_monitor_thread())
317 typename InterProcessPortal<InterThreadTransporter>::implementation_tag>>();
324 InterProcessPortal<InterThreadTransporter>&
interprocess() {
return interprocess_; }
327 return intervehicle_;
340 void preseed_hook(std::shared_ptr<protobuf::ProcessHealth>& health_response)
override
343 for (
const auto& type_map_p : this->
threads())
345 for (
const auto& index_manager_p : type_map_p.second)
347 const auto& thread_manager = index_manager_p.second;
348 auto&
thread_health = *health_response->mutable_main()->add_child();
361template <
class Config>
402 boost::units::quantity<boost::units::si::frequency> loop_freq = 0 * boost::units::si::hertz)
424 boost::units::quantity<boost::units::si::frequency> loop_freq,
int index = -1)
435 std::unique_ptr<InterThreadTransporter> interthread_;
440template <
class Config,
class Transporter>
444template <
class Config,
class Transporter>
445template <
typename ThreadType,
typename ThreadConfig,
bool has_index,
bool has_config>
447 int index,
const ThreadConfig& cfg)
449 std::type_index type_i = std::type_index(
typeid(ThreadType));
451 if (threads_[type_i].count(index) && threads_[type_i][index].alive)
452 throw(Exception(std::string(
"Thread of type: ") + type_i.name() +
" and index " +
453 std::to_string(index) +
" is already launched and running."));
455 auto& thread_manager = threads_[type_i][index];
456 thread_manager.alive =
true;
457 thread_manager.name = boost::core::demangle(
typeid(ThreadType).name());
459 thread_manager.name +=
"/" + std::to_string(index);
460 thread_manager.uid = thread_uid_++;
463 auto thread_lambda = [
this, type_i, index, cfg, &thread_manager]()
467 pthread_setname_np(thread_manager.name.c_str());
471 std::shared_ptr<ThreadType> goby_thread(
472 detail::ThreadTypeSelector<ThreadType, ThreadConfig, has_index, has_config>::thread(
475 goby_thread->set_name(thread_manager.name);
476 goby_thread->set_type_index(type_i);
477 goby_thread->set_uid(thread_manager.uid);
478 goby_thread->run(thread_manager.alive);
482 thread_exception_ = std::current_exception();
485 interthread_.
publish<MainThreadBase::joinable_group_>(ThreadIdentifier{type_i, index});
488 thread_manager.thread = std::unique_ptr<std::thread>(
new std::thread(thread_lambda));
492 pthread_setname_np(thread_manager.thread->native_handle(), thread_manager.name.c_str());
495 ++running_thread_count_;
498template <
class Config,
class Transporter>
500 const std::type_index& type_i,
int index)
502 if (!threads_.count(type_i) || !threads_[type_i].count(index))
503 throw(Exception(std::string(
"No thread of type: ") + type_i.name() +
" and index " +
504 std::to_string(index) +
" to join."));
506 if (threads_[type_i][index].thread)
509 goby::glog <<
"Joining thread: " << type_i.name() <<
" index " << index << std::endl;
511 threads_[type_i][index].alive =
false;
512 threads_[type_i][index].thread->join();
513 threads_[type_i][index].thread.reset();
514 --running_thread_count_;
517 goby::glog <<
"Joined thread: " << type_i.name() <<
" index " << index << std::endl;
519 if (thread_exception_)
522 goby::glog <<
"Thread type: " << type_i.name() <<
", index: " << index
523 <<
" had an uncaught exception" << std::endl;
524 std::rethrow_exception(thread_exception_);
530 goby::glog <<
"Already joined thread: " << type_i.name() <<
" index " << index
Base class for Goby applications. Generally you will want to use SingleThreadApplication or MultiThre...
void configure_geodesy(goby::util::UTMGeodesy::LatLonPoint datum)
boost::units::quantity< boost::units::si::frequency > choose_loop_freq(boost::units::quantity< boost::units::si::frequency > compiled_loop_freq)
const Config & app_cfg()
Accesses configuration object passed at launch.
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporter & inner()
A transporter for the interthread layer.
Implements the forwarder concept for the intervehicle layer.
Base class for creating multiple thread applications.
void launch_timer(boost::units::quantity< boost::units::si::frequency > freq, std::function< void()> on_expire)
void launch_thread_without_cfg(int index)
int running_thread_count()
std::map< std::type_index, std::map< int, ThreadManagement > > & threads()
void launch_thread(int index)
virtual ~MultiThreadApplicationBase()
void launch_thread(const ThreadConfig &cfg)
InterThreadTransporter & interthread()
void launch_thread(int index, const ThreadConfig &cfg)
void launch_thread_without_cfg()
MultiThreadApplicationBase(boost::units::quantity< boost::units::si::frequency > loop_freq, Transporter *transporter)
virtual void post_finalize() override
Called just after finalize.
void join_thread(int index=-1)
Base class for building multithreaded applications for a given implementation of the InterProcessPort...
InterVehicleForwarder< InterProcessPortal< InterThreadTransporter > > & intervehicle()
virtual void post_initialize() override
Assume all required subscriptions are done in the Constructor or in initialize(). If this isn't the c...
InterProcessPortal< InterThreadTransporter > & interprocess()
virtual void health(goby::middleware::protobuf::ThreadHealth &health) override
Called when HealthRequest is made by goby_coroner.
virtual ~MultiThreadApplication()
MultiThreadApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
MultiThreadApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
InterThreadTransporter & interthread()
Base class for building multithreaded Goby applications that do not have perform any interprocess (or...
virtual ~MultiThreadStandaloneApplication()
MultiThreadStandaloneApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
MultiThreadStandaloneApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
Base class for building multithreaded Goby tests that do not have perform any interprocess (or outer)...
InterThreadTransporter & intervehicle()
virtual ~MultiThreadTest()
MultiThreadTest(boost::units::quantity< boost::units::si::frequency > loop_freq=0 *boost::units::si::hertz)
Construct the test running at the given frequency.
InterThreadTransporter & interprocess()
Class for use with MultiThreadStandaloneApplication (interthread only)
StandaloneThread(const Config &cfg, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
InterThreadTransporter & interthread()
StandaloneThread(const Config &cfg, double loop_freq_hertz=0, int index=-1)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
static constexpr goby::middleware::Group shutdown_group_
void set_transporter(InterThreadTransporter *transporter)
Transporter & transporter() const
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
const Config & cfg() const
Thread that simply publishes an empty message on its loop interval to TimerThread::group.
TimerThread(const boost::units::quantity< boost::units::si::frequency > &freq)
static constexpr goby::middleware::Group expire_group
const ::goby::middleware::protobuf::LatLonPoint & datum() const
void set_state(::goby::middleware::protobuf::HealthState value)
void subscribe_terminate(bool do_quit=true)
bool is(goby::util::logger::Verbosity verbosity)
void set_lock_action(logger_lock::LockAction lock_action)
detail namespace with internal helper functions
@ ERROR__THREAD_NOT_RESPONDING
InterProcessPortalImplementation< InnerTransporter, middleware::InterProcessPortalBase, detail::InterProcessTag > InterProcessPortal
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.