Goby3  3.1.4
2024.02.22
liaison_container.h
Go to the documentation of this file.
1 // Copyright 2013-2022:
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 // File authors:
6 // Toby Schneider <toby@gobysoft.org>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
26 #define GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
27 
28 #include <queue>
29 
30 #include <Wt/WColor>
31 #include <Wt/WContainerWidget>
32 #include <Wt/WText>
33 #include <Wt/WTimer>
34 
36 
37 #include "goby/middleware/group.h"
40 
41 namespace goby
42 {
43 namespace zeromq
44 {
45 const Wt::WColor goby_blue(28, 159, 203);
46 const Wt::WColor goby_orange(227, 96, 52);
47 
49 {
50  return "liaison_internal_publish_socket";
51 }
53 {
54  return "liaison_internal_subscribe_socket";
55 }
56 
57 class LiaisonContainer : public Wt::WContainerWidget
58 {
59  public:
61  {
62  setStyleClass("fill");
63  /* addWidget(new Wt::WText("<hr/>")); */
64  /* addWidget(name_); */
65  /* addWidget(new Wt::WText("<hr/>")); */
66  }
67 
69  {
70  goby::glog.is_debug2() && goby::glog << "~LiaisonContainer(): " << name() << std::endl;
71  }
72 
73  void set_name(const Wt::WString& name) { name_.setText(name); }
74 
75  const Wt::WString& name() { return name_.text(); }
76 
77  virtual void focus() {}
78  virtual void unfocus() {}
79  virtual void cleanup() {}
80 
81  private:
82  Wt::WText name_;
83 };
84 
85 template <typename Derived, typename GobyThread>
87 {
88  public:
90  {
91  static std::atomic<int> index(0);
92  index_ = index++;
93 
94  // copy configuration
95  auto thread_lambda = [this, cfg]() {
96  {
97  std::lock_guard<std::mutex> l(goby_thread_mutex);
98  goby_thread_ =
99  std::make_unique<GobyThread>(static_cast<Derived*>(this), cfg, index_);
100  }
101 
102  try
103  {
104  goby_thread_->run(thread_alive_);
105  }
106  catch (...)
107  {
108  thread_exception_ = std::current_exception();
109  }
110 
111  {
112  std::lock_guard<std::mutex> l(goby_thread_mutex);
113  goby_thread_.reset();
114  }
115  };
116 
117  thread_ = std::unique_ptr<std::thread>(new std::thread(thread_lambda));
118 
119  // wait for thread to be created
120  while (goby_thread() == nullptr) usleep(1000);
121 
122  comms_timer_.setInterval(1 / cfg.update_freq() * 1.0e3);
123  comms_timer_.timeout().connect(
124  [this](const Wt::WMouseEvent&) { this->process_from_comms(); });
125  comms_timer_.start();
126  }
127 
129  {
130  thread_alive_ = false;
131  thread_->join();
132 
133  if (thread_exception_)
134  {
135  goby::glog.is_warn() && goby::glog << "Comms thread had an uncaught exception"
136  << std::endl;
137  std::rethrow_exception(thread_exception_);
138  }
139 
140  goby::glog.is_debug2() && goby::glog << "~LiaisonContainerWithComms(): " << name()
141  << std::endl;
142  }
143 
144  void post_to_wt(std::function<void()> func)
145  {
146  std::lock_guard<std::mutex> l(comms_to_wt_mutex);
147  comms_to_wt_queue.push(func);
148  }
149 
151  {
152  std::lock_guard<std::mutex> l(wt_to_comms_mutex);
153  while (!wt_to_comms_queue.empty())
154  {
155  wt_to_comms_queue.front()();
156  wt_to_comms_queue.pop();
157  }
158  }
159 
160  protected:
161  GobyThread* goby_thread()
162  {
163  std::lock_guard<std::mutex> l(goby_thread_mutex);
164  return goby_thread_.get();
165  }
166 
167  void post_to_comms(std::function<void()> func)
168  {
169  std::lock_guard<std::mutex> l(wt_to_comms_mutex);
170  wt_to_comms_queue.push(func);
171  }
172 
174  {
175  std::lock_guard<std::mutex> l(comms_to_wt_mutex);
176  while (!comms_to_wt_queue.empty())
177  {
178  comms_to_wt_queue.front()();
179  comms_to_wt_queue.pop();
180  }
181  }
182 
183  void update_comms_freq(double hertz)
184  {
185  comms_timer_.stop();
186  comms_timer_.setInterval(1 / hertz * 1.0e3);
187  comms_timer_.start();
188  }
189 
190  private:
191  // for comms
192  std::mutex comms_to_wt_mutex;
193  std::queue<std::function<void()>> comms_to_wt_queue;
194  std::mutex wt_to_comms_mutex;
195  std::queue<std::function<void()>> wt_to_comms_queue;
196 
197  // only protects the unique_ptr, not the underlying thread
198  std::mutex goby_thread_mutex;
199  std::unique_ptr<GobyThread> goby_thread_{nullptr};
200 
201  int index_;
202  std::unique_ptr<std::thread> thread_;
203  std::atomic<bool> thread_alive_{true};
204  std::exception_ptr thread_exception_;
205 
206  Wt::WTimer comms_timer_;
207 };
208 
209 template <typename WtContainer>
211  : public goby::middleware::SimpleThread<goby::apps::zeromq::protobuf::LiaisonConfig>
212 {
213  public:
214  LiaisonCommsThread(WtContainer* container,
216  : goby::middleware::SimpleThread<goby::apps::zeromq::protobuf::LiaisonConfig>(
217  config, config.update_freq() * boost::units::si::hertz, index),
218  container_(container)
219  {
220  }
221 
222  void loop() override
223  {
224  // goby::glog.is_debug3() && goby::glog << "LiaisonCommsThread " << this->index() << " loop()"
225  // << std::endl;
226  container_->process_from_wt();
227  }
228 
229  private:
230  WtContainer* container_;
231 };
232 } // namespace zeromq
233 } // namespace goby
234 #endif
goby::zeromq::LiaisonCommsThread
Definition: liaison_container.h:210
goby::zeromq::liaison_internal_publish_socket_name
std::string liaison_internal_publish_socket_name()
Definition: liaison_container.h:48
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::util::FlexOstream::is_warn
bool is_warn()
Definition: flex_ostream.h:82
goby::zeromq::LiaisonContainer::LiaisonContainer
LiaisonContainer()
Definition: liaison_container.h:60
goby::zeromq::LiaisonContainer::set_name
void set_name(const Wt::WString &name)
Definition: liaison_container.h:73
goby::zeromq::LiaisonContainer::focus
virtual void focus()
Definition: liaison_container.h:77
group.h
protobuf.h
goby::acomms::abc::protobuf::config
extern ::google::protobuf::internal::ExtensionIdentifier< ::goby::acomms::protobuf::DriverConfig, ::google::protobuf::internal::MessageTypeTraits< ::goby::acomms::abc::protobuf::Config >, 11, false > config
Definition: abc_driver.pb.h:203
goby::util::logger::mutex
std::recursive_mutex mutex
goby::middleware::Thread< goby::apps::zeromq::protobuf::LiaisonConfig, InterVehicleForwarder< InterProcessForwarder< InterThreadTransporter > > >::index
int index() const
Definition: thread.h:145
goby::zeromq::LiaisonCommsThread::LiaisonCommsThread
LiaisonCommsThread(WtContainer *container, const goby::apps::zeromq::protobuf::LiaisonConfig &config, int index)
Definition: liaison_container.h:214
boost
Definition: udp_driver.h:41
goby::zeromq::liaison_internal_subscribe_socket_name
std::string liaison_internal_subscribe_socket_name()
Definition: liaison_container.h:52
goby::zeromq::LiaisonContainerWithComms
Definition: liaison_container.h:86
multi_thread.h
goby::util::FlexOstream::is_debug2
bool is_debug2()
Definition: flex_ostream.h:85
detail::void
j template void())
Definition: json.hpp:4822
goby::apps::zeromq::protobuf::LiaisonConfig::update_freq
float update_freq() const
Definition: liaison_config.pb.h:3296
goby::zeromq::LiaisonContainerWithComms::process_from_wt
void process_from_wt()
Definition: liaison_container.h:150
goby::zeromq::LiaisonContainerWithComms::process_from_comms
void process_from_comms()
Definition: liaison_container.h:173
goby::zeromq::LiaisonContainerWithComms::post_to_comms
void post_to_comms(std::function< void()> func)
Definition: liaison_container.h:167
goby::middleware::SimpleThread
Implements Thread for a three layer middleware setup ([ intervehicle [ interprocess [ interthread ] ]...
Definition: simple_thread.h:43
goby::zeromq::LiaisonContainerWithComms::LiaisonContainerWithComms
LiaisonContainerWithComms(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg)
Definition: liaison_container.h:89
goby::zeromq::goby_blue
const Wt::WColor goby_blue(28, 159, 203)
goby::zeromq::LiaisonContainer::cleanup
virtual void cleanup()
Definition: liaison_container.h:79
goby::zeromq::LiaisonContainerWithComms::post_to_wt
void post_to_wt(std::function< void()> func)
Definition: liaison_container.h:144
goby::zeromq::LiaisonContainer::~LiaisonContainer
virtual ~LiaisonContainer()
Definition: liaison_container.h:68
goby::middleware::SimpleThread< goby::apps::zeromq::protobuf::LiaisonConfig >::SimpleThread
SimpleThread(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg, double loop_freq_hertz=0, int index=-1)
Construct a thread with a given configuration, optionally a loop frequency and/or index.
Definition: simple_thread.h:58
goby::zeromq::goby_orange
const Wt::WColor goby_orange(227, 96, 52)
goby::zeromq::LiaisonContainer::name
const Wt::WString & name()
Definition: liaison_container.h:75
goby::zeromq::LiaisonContainer::unfocus
virtual void unfocus()
Definition: liaison_container.h:78
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::zeromq::LiaisonCommsThread::loop
void loop() override
Definition: liaison_container.h:222
goby::zeromq::LiaisonContainerWithComms::~LiaisonContainerWithComms
virtual ~LiaisonContainerWithComms()
Definition: liaison_container.h:128
goby::apps::zeromq::protobuf::LiaisonConfig
Definition: liaison_config.pb.h:206
goby::zeromq::LiaisonContainerWithComms::update_comms_freq
void update_comms_freq(double hertz)
Definition: liaison_container.h:183
goby::zeromq::LiaisonContainerWithComms::goby_thread
GobyThread * goby_thread()
Definition: liaison_container.h:161
goby::zeromq::LiaisonContainer
Definition: liaison_container.h:57
liaison_config.pb.h