25#ifndef GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
26#define GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
31#include <Wt/WContainerWidget.h>
50 return "liaison_internal_publish_socket";
54 return "liaison_internal_subscribe_socket";
62 setStyleClass(
"fill");
75 const Wt::WString&
name() {
return name_.text(); }
85template <
typename Derived,
typename GobyThread>
91 static std::atomic<int> index(0);
95 auto thread_lambda = [
this, cfg]()
98 std::lock_guard<std::mutex> l(goby_thread_mutex);
100 std::make_unique<GobyThread>(
static_cast<Derived*
>(
this), cfg, index_);
105 goby_thread_->run(thread_alive_);
109 thread_exception_ = std::current_exception();
113 std::lock_guard<std::mutex> l(goby_thread_mutex);
114 goby_thread_.reset();
118 thread_ = std::unique_ptr<std::thread>(
new std::thread(thread_lambda));
123 comms_timer_.setInterval(
124 std::chrono::milliseconds(
static_cast<long>(1 / cfg.
update_freq() * 1.0e3)));
125 comms_timer_.timeout().connect([
this](
const Wt::WMouseEvent&)
127 comms_timer_.start();
132 thread_alive_ =
false;
135 if (thread_exception_)
139 std::rethrow_exception(thread_exception_);
148 std::lock_guard<std::mutex> l(comms_to_wt_mutex);
149 comms_to_wt_queue.push(func);
154 std::lock_guard<std::mutex> l(wt_to_comms_mutex);
155 while (!wt_to_comms_queue.empty())
157 wt_to_comms_queue.front()();
158 wt_to_comms_queue.pop();
165 std::lock_guard<std::mutex> l(goby_thread_mutex);
166 return goby_thread_.get();
171 std::lock_guard<std::mutex> l(wt_to_comms_mutex);
172 wt_to_comms_queue.push(func);
177 std::lock_guard<std::mutex> l(comms_to_wt_mutex);
178 while (!comms_to_wt_queue.empty())
180 comms_to_wt_queue.front()();
181 comms_to_wt_queue.pop();
188 comms_timer_.setInterval(std::chrono::milliseconds(
static_cast<long>(1 / hertz * 1.0e3)));
189 comms_timer_.start();
194 std::mutex comms_to_wt_mutex;
195 std::queue<std::function<void()>> comms_to_wt_queue;
196 std::mutex wt_to_comms_mutex;
197 std::queue<std::function<void()>> wt_to_comms_queue;
200 std::mutex goby_thread_mutex;
201 std::unique_ptr<GobyThread> goby_thread_{
nullptr};
204 std::unique_ptr<std::thread> thread_;
205 std::atomic<bool> thread_alive_{
true};
206 std::exception_ptr thread_exception_;
208 Wt::WTimer comms_timer_;
211template <
typename WtContainer>
219 config, config.update_freq() *
boost::units::si::hertz,
index),
220 container_(container)
228 container_->process_from_wt();
232 WtContainer* container_;
float update_freq() const
Implements Thread for a three layer middleware setup ([ intervehicle [ interprocess [ interthread ] ]...
SimpleThread(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg, double loop_freq_hertz=0, int index=-1)
Construct a thread with a given configuration, optionally a loop frequency and/or index.
LiaisonCommsThread(WtContainer *container, const goby::apps::zeromq::protobuf::LiaisonConfig &config, int index)
LiaisonContainerWithComms(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg)
void update_comms_freq(double hertz)
virtual ~LiaisonContainerWithComms()
GobyThread * goby_thread()
void post_to_wt(std::function< void()> func)
void process_from_comms()
void post_to_comms(std::function< void()> func)
void set_name(const Wt::WString &name)
virtual ~LiaisonContainer()
const Wt::WString & name()
const Wt::WColor goby_blue(28, 159, 203)
const Wt::WColor goby_orange(227, 96, 52)
std::string liaison_internal_publish_socket_name()
std::string liaison_internal_subscribe_socket_name()
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.