Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
liaison_container.h
Go to the documentation of this file.
1// Copyright 2013-2025:
2// GobySoft, LLC (2013-)
3// Massachusetts Institute of Technology (2007-2014)
4// Community contributors (see AUTHORS file)
5// File authors:
6// Toby Schneider <toby@gobysoft.org>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
26#define GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
27
28#include <queue>
29
30#include <Wt/WColor.h>
31#include <Wt/WContainerWidget.h>
32#include <Wt/WText.h>
33#include <Wt/WTimer.h>
34
36
40
41namespace goby
42{
43namespace zeromq
44{
45const Wt::WColor goby_blue(28, 159, 203);
46const Wt::WColor goby_orange(227, 96, 52);
47
49{
50 return "liaison_internal_publish_socket";
51}
53{
54 return "liaison_internal_subscribe_socket";
55}
56
57class LiaisonContainer : public Wt::WContainerWidget
58{
59 public:
61 {
62 setStyleClass("fill");
63 /* addWidget(new Wt::WText("<hr/>")); */
64 /* addWidget(name_); */
65 /* addWidget(new Wt::WText("<hr/>")); */
66 }
67
69 {
70 goby::glog.is_debug2() && goby::glog << "~LiaisonContainer(): " << name() << std::endl;
71 }
72
73 void set_name(const Wt::WString& name) { name_.setText(name); }
74
75 const Wt::WString& name() { return name_.text(); }
76
77 virtual void focus() {}
78 virtual void unfocus() {}
79 virtual void cleanup() {}
80
81 private:
82 Wt::WText name_;
83};
84
85template <typename Derived, typename GobyThread>
87{
88 public:
90 {
91 static std::atomic<int> index(0);
92 index_ = index++;
93
94 // copy configuration
95 auto thread_lambda = [this, cfg]()
96 {
97 {
98 std::lock_guard<std::mutex> l(goby_thread_mutex);
99 goby_thread_ =
100 std::make_unique<GobyThread>(static_cast<Derived*>(this), cfg, index_);
101 }
102
103 try
104 {
105 goby_thread_->run(thread_alive_);
106 }
107 catch (...)
108 {
109 thread_exception_ = std::current_exception();
110 }
111
112 {
113 std::lock_guard<std::mutex> l(goby_thread_mutex);
114 goby_thread_.reset();
115 }
116 };
117
118 thread_ = std::unique_ptr<std::thread>(new std::thread(thread_lambda));
119
120 // wait for thread to be created
121 while (goby_thread() == nullptr) usleep(1000);
122
123 comms_timer_.setInterval(
124 std::chrono::milliseconds(static_cast<long>(1 / cfg.update_freq() * 1.0e3)));
125 comms_timer_.timeout().connect([this](const Wt::WMouseEvent&)
126 { this->process_from_comms(); });
127 comms_timer_.start();
128 }
129
131 {
132 thread_alive_ = false;
133 thread_->join();
134
135 if (thread_exception_)
136 {
137 goby::glog.is_warn() && goby::glog << "Comms thread had an uncaught exception"
138 << std::endl;
139 std::rethrow_exception(thread_exception_);
140 }
141
142 goby::glog.is_debug2() && goby::glog << "~LiaisonContainerWithComms(): " << name()
143 << std::endl;
144 }
145
146 void post_to_wt(std::function<void()> func)
147 {
148 std::lock_guard<std::mutex> l(comms_to_wt_mutex);
149 comms_to_wt_queue.push(func);
150 }
151
153 {
154 std::lock_guard<std::mutex> l(wt_to_comms_mutex);
155 while (!wt_to_comms_queue.empty())
156 {
157 wt_to_comms_queue.front()();
158 wt_to_comms_queue.pop();
159 }
160 }
161
162 protected:
163 GobyThread* goby_thread()
164 {
165 std::lock_guard<std::mutex> l(goby_thread_mutex);
166 return goby_thread_.get();
167 }
168
169 void post_to_comms(std::function<void()> func)
170 {
171 std::lock_guard<std::mutex> l(wt_to_comms_mutex);
172 wt_to_comms_queue.push(func);
173 }
174
176 {
177 std::lock_guard<std::mutex> l(comms_to_wt_mutex);
178 while (!comms_to_wt_queue.empty())
179 {
180 comms_to_wt_queue.front()();
181 comms_to_wt_queue.pop();
182 }
183 }
184
185 void update_comms_freq(double hertz)
186 {
187 comms_timer_.stop();
188 comms_timer_.setInterval(std::chrono::milliseconds(static_cast<long>(1 / hertz * 1.0e3)));
189 comms_timer_.start();
190 }
191
192 private:
193 // for comms
194 std::mutex comms_to_wt_mutex;
195 std::queue<std::function<void()>> comms_to_wt_queue;
196 std::mutex wt_to_comms_mutex;
197 std::queue<std::function<void()>> wt_to_comms_queue;
198
199 // only protects the unique_ptr, not the underlying thread
200 std::mutex goby_thread_mutex;
201 std::unique_ptr<GobyThread> goby_thread_{nullptr};
202
203 int index_;
204 std::unique_ptr<std::thread> thread_;
205 std::atomic<bool> thread_alive_{true};
206 std::exception_ptr thread_exception_;
207
208 Wt::WTimer comms_timer_;
209};
210
211template <typename WtContainer>
213 : public goby::middleware::SimpleThread<goby::apps::zeromq::protobuf::LiaisonConfig>
214{
215 public:
216 LiaisonCommsThread(WtContainer* container,
218 : goby::middleware::SimpleThread<goby::apps::zeromq::protobuf::LiaisonConfig>(
219 config, config.update_freq() * boost::units::si::hertz, index),
220 container_(container)
221 {
222 }
223
224 void loop() override
225 {
226 // goby::glog.is_debug3() && goby::glog << "LiaisonCommsThread " << this->index() << " loop()"
227 // << std::endl;
228 container_->process_from_wt();
229 }
230
231 private:
232 WtContainer* container_;
233};
234} // namespace zeromq
235} // namespace goby
236#endif
Implements Thread for a three layer middleware setup ([ intervehicle [ interprocess [ interthread ] ]...
SimpleThread(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg, double loop_freq_hertz=0, int index=-1)
Construct a thread with a given configuration, optionally a loop frequency and/or index.
LiaisonCommsThread(WtContainer *container, const goby::apps::zeromq::protobuf::LiaisonConfig &config, int index)
LiaisonContainerWithComms(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg)
void post_to_wt(std::function< void()> func)
void post_to_comms(std::function< void()> func)
void set_name(const Wt::WString &name)
const Wt::WColor goby_blue(28, 159, 203)
const Wt::WColor goby_orange(227, 96, 52)
std::string liaison_internal_publish_socket_name()
std::string liaison_internal_subscribe_socket_name()
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.