26#ifndef GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
27#define GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
29#include <boost/core/demangle.hpp>
30#include <boost/units/systems/si.hpp>
65 :
public Thread<boost::units::quantity<boost::units::si::frequency>, InterThreadTransporter>,
77 TimerThread(
const boost::units::quantity<boost::units::si::frequency>& freq)
84 void loop()
override { interthread_.template publish_empty<expire_group>(); }
86 InterThreadTransporter& interthread() {
return interthread_; }
89 InterThreadTransporter interthread_;
98template <
class Config,
class Transporter>
103 struct ThreadManagement
105 ThreadManagement() =
default;
111 goby::glog <<
"Joining thread: " << name << std::endl;
117 std::atomic<bool> alive{
true};
120 std::unique_ptr<std::thread> thread;
123 static std::exception_ptr thread_exception_;
125 std::map<std::type_index, std::map<int, ThreadManagement>> threads_;
127 int running_thread_count_{0};
133 _launch_thread<ThreadType, Config, false, true>(-1, this->
app_cfg());
137 _launch_thread<ThreadType, Config, true, true>(
index, this->
app_cfg());
140 template <
typename ThreadType,
typename ThreadConfig>
143 _launch_thread<ThreadType, ThreadConfig, false, true>(-1,
cfg);
145 template <
typename ThreadType,
typename ThreadConfig>
148 _launch_thread<ThreadType, ThreadConfig, true, true>(
index,
cfg);
153 _launch_thread<ThreadType, Config, false, false>(-1, this->
app_cfg());
157 _launch_thread<ThreadType, Config, true, false>(
index, this->
app_cfg());
163 auto type_i = std::type_index(
typeid(ThreadType));
168 if (threads_.count(type_i) && threads_[type_i].count(
index))
170 auto& thread_manager = threads_[type_i][
index];
176 void launch_timer(boost::units::quantity<boost::units::si::frequency> freq,
177 std::function<
void()> on_expire)
179 launch_thread<goby::middleware::TimerThread<i>>(freq);
181 .template subscribe_empty<goby::middleware::TimerThread<i>::expire_group>(on_expire);
184 template <
int i>
void join_timer() { join_thread<goby::middleware::TimerThread<i>>(); }
198 interthread_.template subscribe<MainThreadBase::joinable_group_>(
200 { _join_thread(joinable.
type_i, joinable.
index); });
208 std::map<std::type_index, std::map<int, ThreadManagement>>&
threads() {
return threads_; }
212 if (running_thread_count_ > 0)
215 goby::glog <<
"Requesting that all remaining threads shutdown cleanly..."
223 while (running_thread_count_ > 0)
226 << running_thread_count_
227 <<
" threads." << std::endl;
230 std::chrono::milliseconds(100));
245 catch (std::exception& e)
248 goby::glog <<
"MultiThreadApplicationBase:: uncaught exception: " << e.what()
254 template <
typename ThreadType,
typename ThreadConfig,
bool has_index,
bool has_config>
255 void _launch_thread(
int index,
const ThreadConfig&
cfg);
257 void _join_thread(
const std::type_index& type_i,
int index);
267 Config, InterVehicleForwarder<InterProcessPortal<InterThreadTransporter>>>,
272 InterProcessPortal<InterThreadTransporter> interprocess_;
296 :
Base(loop_freq, &intervehicle_),
299 intervehicle_(interprocess_)
306 this->
interprocess().template subscribe<goby::middleware::groups::datum_update>(
310 {datum_update.
datum().lat_with_units(), datum_update.datum().lon_with_units()});
313 this->
interprocess().template publish<goby::middleware::groups::configuration>(
316 if (this->
app_cfg().app().health_cfg().run_health_monitor_thread())
318 typename InterProcessPortal<InterThreadTransporter>::implementation_tag>>();
325 InterProcessPortal<InterThreadTransporter>&
interprocess() {
return interprocess_; }
328 return intervehicle_;
341 void preseed_hook(std::shared_ptr<protobuf::ProcessHealth>& health_response)
override
344 for (
const auto& type_map_p : this->
threads())
346 for (
const auto& index_manager_p : type_map_p.second)
348 const auto& thread_manager = index_manager_p.second;
349 auto&
thread_health = *health_response->mutable_main()->add_child();
362template <
class Config>
403 boost::units::quantity<boost::units::si::frequency> loop_freq = 0 * boost::units::si::hertz)
425 boost::units::quantity<boost::units::si::frequency> loop_freq,
int index = -1)
436 std::unique_ptr<InterThreadTransporter> interthread_;
441template <
class Config,
class Transporter>
445template <
class Config,
class Transporter>
446template <
typename ThreadType,
typename ThreadConfig,
bool has_index,
bool has_config>
448 int index,
const ThreadConfig& cfg)
450 std::type_index type_i = std::type_index(
typeid(ThreadType));
452 if (threads_[type_i].count(index) && threads_[type_i][index].alive)
453 throw(Exception(std::string(
"Thread of type: ") + type_i.name() +
" and index " +
454 std::to_string(index) +
" is already launched and running."));
456 auto& thread_manager = threads_[type_i][index];
457 thread_manager.alive =
true;
458 thread_manager.name = boost::core::demangle(
typeid(ThreadType).name());
460 thread_manager.name +=
"/" + std::to_string(index);
461 thread_manager.uid = thread_uid_++;
464 auto thread_lambda = [
this, type_i, index, cfg, &thread_manager]()
468 pthread_setname_np(thread_manager.name.c_str());
472 std::shared_ptr<ThreadType> goby_thread(
473 detail::ThreadTypeSelector<ThreadType, ThreadConfig, has_index, has_config>::thread(
476 goby_thread->set_name(thread_manager.name);
477 goby_thread->set_type_index(type_i);
478 goby_thread->set_uid(thread_manager.uid);
479 goby_thread->run(thread_manager.alive);
483 thread_exception_ = std::current_exception();
486 interthread_.
publish<MainThreadBase::joinable_group_>(ThreadIdentifier{type_i,
index});
489 thread_manager.thread = std::unique_ptr<std::thread>(
new std::thread(thread_lambda));
493 pthread_setname_np(thread_manager.thread->native_handle(), thread_manager.name.c_str());
496 ++running_thread_count_;
499template <
class Config,
class Transporter>
501 const std::type_index& type_i,
int index)
503 if (!threads_.count(type_i) || !threads_[type_i].count(index))
504 throw(Exception(std::string(
"No thread of type: ") + type_i.name() +
" and index " +
505 std::to_string(index) +
" to join."));
507 if (threads_[type_i][index].thread)
510 goby::glog <<
"Joining thread: " << type_i.name() <<
" index " <<
index << std::endl;
512 threads_[type_i][
index].alive =
false;
513 threads_[type_i][
index].thread->join();
514 threads_[type_i][
index].thread.reset();
515 --running_thread_count_;
518 goby::glog <<
"Joined thread: " << type_i.name() <<
" index " <<
index << std::endl;
520 if (thread_exception_)
524 <<
" had an uncaught exception" << std::endl;
525 std::rethrow_exception(thread_exception_);
531 goby::glog <<
"Already joined thread: " << type_i.name() <<
" index " <<
index
Base class for Goby applications. Generally you will want to use SingleThreadApplication or MultiThre...
void configure_geodesy(goby::util::UTMGeodesy::LatLonPoint datum)
boost::units::quantity< boost::units::si::frequency > choose_loop_freq(boost::units::quantity< boost::units::si::frequency > compiled_loop_freq)
const Config & app_cfg()
Accesses configuration object passed at launch.
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporter & inner()
A transporter for the interthread layer.
Implements the forwarder concept for the intervehicle layer.
Base class for creating multiple thread applications.
void launch_timer(boost::units::quantity< boost::units::si::frequency > freq, std::function< void()> on_expire)
void launch_thread_without_cfg(int index)
int running_thread_count()
std::map< std::type_index, std::map< int, ThreadManagement > > & threads()
void launch_thread(int index)
virtual ~MultiThreadApplicationBase()
void launch_thread(const ThreadConfig &cfg)
InterThreadTransporter & interthread()
void launch_thread(int index, const ThreadConfig &cfg)
void launch_thread_without_cfg()
MultiThreadApplicationBase(boost::units::quantity< boost::units::si::frequency > loop_freq, Transporter *transporter)
virtual void post_finalize() override
Called just after finalize.
void join_thread(int index=-1)
Base class for building multithreaded applications for a given implementation of the InterProcessPort...
InterVehicleForwarder< InterProcessPortal< InterThreadTransporter > > & intervehicle()
virtual void post_initialize() override
Assume all required subscriptions are done in the Constructor or in initialize(). If this isn't the c...
InterProcessPortal< InterThreadTransporter > & interprocess()
virtual void health(goby::middleware::protobuf::ThreadHealth &health) override
Called when HealthRequest is made by goby_coroner.
virtual ~MultiThreadApplication()
MultiThreadApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
MultiThreadApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
InterThreadTransporter & interthread()
Base class for building multithreaded Goby applications that do not have perform any interprocess (or...
virtual ~MultiThreadStandaloneApplication()
MultiThreadStandaloneApplication(double loop_freq_hertz=0)
Construct the application calling loop() at the given frequency (double overload)
MultiThreadStandaloneApplication(boost::units::quantity< boost::units::si::frequency > loop_freq)
Construct the application calling loop() at the given frequency (boost::units overload)
Base class for building multithreaded Goby tests that do not have perform any interprocess (or outer)...
InterThreadTransporter & intervehicle()
virtual ~MultiThreadTest()
MultiThreadTest(boost::units::quantity< boost::units::si::frequency > loop_freq=0 *boost::units::si::hertz)
Construct the test running at the given frequency.
InterThreadTransporter & interprocess()
Class for use with MultiThreadStandaloneApplication (interthread only)
StandaloneThread(const Config &cfg, boost::units::quantity< boost::units::si::frequency > loop_freq, int index=-1)
InterThreadTransporter & interthread()
StandaloneThread(const Config &cfg, double loop_freq_hertz=0, int index=-1)
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
static constexpr goby::middleware::Group shutdown_group_
void set_transporter(InterThreadTransporter *transporter)
Transporter & transporter() const
void thread_health(goby::middleware::protobuf::ThreadHealth &health)
const Config & cfg() const
Thread that simply publishes an empty message on its loop interval to TimerThread::group.
TimerThread(const boost::units::quantity< boost::units::si::frequency > &freq)
static constexpr goby::middleware::Group expire_group
const ::goby::middleware::protobuf::LatLonPoint & datum() const
void set_state(::goby::middleware::protobuf::HealthState value)
void subscribe_terminate(bool do_quit=true)
bool is(goby::util::logger::Verbosity verbosity)
void set_lock_action(logger_lock::LockAction lock_action)
detail namespace with internal helper functions
@ ERROR__THREAD_NOT_RESPONDING
InterProcessPortalImplementation< InnerTransporter, middleware::InterProcessPortalBase, detail::InterProcessTag > InterProcessPortal
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
uint32_t index(const std::array< int8_t, 256 > &rdata, char symbol)