Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
io_interface.h
Go to the documentation of this file.
1// Copyright 2019-2022:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
25#define GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
26
27#include <chrono> // for seconds
28#include <exception> // for exception
29#include <memory> // for shared_ptr
30#include <mutex> // for mutex, lock_...
31#include <ostream> // for endl, size_t
32#include <string> // for string, oper...
33#include <thread> // for thread
34#include <unistd.h> // for usleep
35
36#include <boost/asio/write.hpp> // for async_write
37#include <boost/system/error_code.hpp> // for error_code
38
39#include "goby/exception.h" // for Exception
40#include "goby/middleware/application/multi_thread.h" // for SimpleThread
41#include "goby/middleware/common.h" // for thread_id
42#include "goby/middleware/io/groups.h" // for status
43#include "goby/middleware/protobuf/io.pb.h" // for IOError, IOS...
44#include "goby/time/steady_clock.h" // for SteadyClock
46#include "goby/util/debug_logger.h" // for glog
47
48#include "io_transporters.h"
49
50namespace goby
51{
52namespace middleware
53{
54class Group;
55class InterThreadTransporter;
56template <typename InnerTransporter> class InterProcessForwarder;
57namespace io
58{
59enum class ThreadState
60{
62};
63
64namespace detail
65{
66template <typename ProtobufEndpoint, typename ASIOEndpoint>
67ProtobufEndpoint endpoint_convert(const ASIOEndpoint& asio_ep)
68{
69 ProtobufEndpoint pb_ep;
70 pb_ep.set_addr(asio_ep.address().to_string());
71 pb_ep.set_port(asio_ep.port());
72 return pb_ep;
73}
74
75template <const goby::middleware::Group& line_in_group,
76 const goby::middleware::Group& line_out_group, PubSubLayer publish_layer,
77 PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
78 template <class> class ThreadType, bool use_indexed_groups = false>
79class IOThread : public ThreadType<IOConfig>,
81 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
82 IOConfig, SocketType, ThreadType, use_indexed_groups>,
83 line_in_group, publish_layer, use_indexed_groups>,
85 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
86 IOConfig, SocketType, ThreadType, use_indexed_groups>,
87 line_out_group, subscribe_layer, use_indexed_groups>
88{
89 public:
94 IOThread(const IOConfig& config, int index, std::string glog_group = "i/o")
95 : ThreadType<IOConfig>(config, this->loop_max_frequency(), index),
97 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
98 SocketType, ThreadType, use_indexed_groups>,
99 line_in_group, publish_layer, use_indexed_groups>(index),
101 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
102 SocketType, ThreadType, use_indexed_groups>,
103 line_out_group, subscribe_layer, use_indexed_groups>(index),
104 glog_group_(glog_group + " / t" + std::to_string(goby::middleware::gettid())),
105 thread_name_(glog_group)
106 {
107 auto data_out_callback =
108 [this](std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) {
109 if (!io_msg->has_index() || io_msg->index() == this->index())
110 {
111 write(io_msg);
112 }
113 };
114
115 this->template subscribe_out<goby::middleware::protobuf::IOData>(data_out_callback);
116
117 if (!glog_group_added_)
118 {
120 glog_group_added_ = true;
121 }
122 }
123
124 void initialize() override
125 {
126 // thread to handle synchonization between boost::asio and goby condition_variable signaling
127 incoming_mail_notify_thread_.reset(new std::thread([this]() {
128 while (this->alive())
129 {
130 std::unique_lock<std::mutex> lock(incoming_mail_notify_mutex_);
131 this->interthread().cv()->wait(lock);
132 // post empty handler to cause loop() to return and allow incoming mail to be handled
133 io_.post([]() {});
134 }
135 }));
136
137 this->set_name(thread_name_);
138 }
139
140 void finalize() override
141 {
142 // join incoming mail thread
143 {
144 std::lock_guard<std::mutex> l(incoming_mail_notify_mutex_);
145 this->interthread().cv()->notify_all();
146 }
147 incoming_mail_notify_thread_->join();
148 incoming_mail_notify_thread_.reset();
149 }
150
151 virtual ~IOThread()
152 {
153 socket_.reset();
154
155 // for non clean shutdown, avoid abort
156 if (incoming_mail_notify_thread_)
157 incoming_mail_notify_thread_->detach();
158
159 auto status = std::make_shared<protobuf::IOStatus>();
160 status->set_state(protobuf::IO__LINK_CLOSED);
161
162 this->publish_in(status);
163 this->template unsubscribe_out<goby::middleware::protobuf::IOData>();
164 }
165
166 template <class IOThreadImplementation>
167 friend void basic_async_write(IOThreadImplementation* this_thread,
168 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg);
169
170 protected:
171 void write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
172 {
174 goby::glog << group(glog_group_) << "(" << io_msg->data().size() << "B) <"
175 << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
176 << " " << io_msg->ShortDebugString() << std::endl;
177 if (io_msg->data().empty())
178 return;
179 if (!socket_ || !socket_->is_open())
180 return;
181
182 this->async_write(io_msg);
183 }
184
185 void handle_read_success(std::size_t bytes_transferred, const std::string& bytes)
186 {
187 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
188 *io_msg->mutable_data() = bytes;
189
190 handle_read_success(bytes_transferred, io_msg);
191 }
192
193 void handle_read_success(std::size_t bytes_transferred,
194 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
195 {
196 if (this->index() != -1)
197 io_msg->set_index(this->index());
198
200 goby::glog << group(glog_group_) << "(" << bytes_transferred << "B) >"
201 << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
202 << " " << io_msg->ShortDebugString() << std::endl;
203
204 this->publish_in(io_msg);
205 }
206
207 void handle_write_success(std::size_t bytes_transferred) {}
208 void handle_read_error(const boost::system::error_code& ec);
209 void handle_write_error(const boost::system::error_code& ec);
210
212 SocketType& mutable_socket()
213 {
214 if (socket_)
215 return *socket_;
216 else
217 throw goby::Exception("Attempted to access null socket/serial_port");
218 }
219
221
223 bool socket_is_open() { return socket_ && socket_->is_open(); }
224
226 virtual void open_socket() = 0;
227
229 virtual void async_read() = 0;
230
232 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
233
234 const std::string& glog_group() { return glog_group_; }
235
236 private:
238 void try_open();
239
241 void loop() override;
242
243 private:
245 std::unique_ptr<SocketType> socket_;
246
247 const goby::time::SteadyClock::duration min_backoff_interval_{std::chrono::seconds(1)};
248 const goby::time::SteadyClock::duration max_backoff_interval_{std::chrono::seconds(128)};
249 goby::time::SteadyClock::duration backoff_interval_{min_backoff_interval_};
251
252 std::mutex incoming_mail_notify_mutex_;
253 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
254
255 std::string glog_group_;
256 std::string thread_name_;
257 bool glog_group_added_{false};
258};
259
260template <class IOThreadImplementation>
261void basic_async_write(IOThreadImplementation* this_thread,
262 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
263{
264 boost::asio::async_write(
265 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
266 // capture io_msg in callback to ensure write buffer exists until async_write is done
267 [this_thread, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred) {
268 if (!ec && bytes_transferred > 0)
269 {
270 this_thread->handle_write_success(bytes_transferred);
271 }
272 else
273 {
274 this_thread->handle_write_error(ec);
275 }
276 });
277}
278
279} // namespace detail
280} // namespace io
281} // namespace middleware
282} // namespace goby
283
284template <const goby::middleware::Group& line_in_group,
285 const goby::middleware::Group& line_out_group,
287 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
288 template <class> class ThreadType, bool use_indexed_groups>
289void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
290 subscribe_layer, IOConfig, SocketType, ThreadType,
291 use_indexed_groups>::try_open()
292{
293 try
294 {
295 socket_.reset(new SocketType(io_));
296 open_socket();
297
298 // messages read from the socket
299 this->async_read();
300
301 // reset io_context, which ran out of work
302 io_.reset();
303
304 // successful, reset backoff
305 backoff_interval_ = min_backoff_interval_;
306
307 auto status = std::make_shared<protobuf::IOStatus>();
308 if (this->index() != -1)
309 status->set_index(this->index());
310
311 status->set_state(protobuf::IO__LINK_OPEN);
312 this->publish_in(status);
313
314 goby::glog.is_debug2() && goby::glog << group(glog_group_) << "Successfully opened socket"
315 << std::endl;
316
317 // update to avoid thrashing on open success but read/write failure
318 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
319 next_open_attempt_ = now + backoff_interval_;
320 }
321 catch (const std::exception& e)
322 {
323 auto status = std::make_shared<protobuf::IOStatus>();
324 if (this->index() != -1)
325 status->set_index(this->index());
326
327 status->set_state(protobuf::IO__CRITICAL_FAILURE);
330 error.set_text(e.what() + std::string(": config (") + this->cfg().ShortDebugString() + ")");
331 this->publish_in(status);
332
333 goby::glog.is_warn() && goby::glog << group(glog_group_)
334 << "Failed to open/configure socket/serial_port: "
335 << error.ShortDebugString() << std::endl;
336
337 if (backoff_interval_ < max_backoff_interval_)
338 backoff_interval_ *= 2.0;
339
340 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
341 next_open_attempt_ = now + backoff_interval_;
342
343 goby::glog.is_warn() && goby::glog << group(glog_group_) << "Will retry in "
344 << backoff_interval_ / std::chrono::seconds(1)
345 << " seconds" << std::endl;
346 socket_.reset();
347 }
348}
349
350template <const goby::middleware::Group& line_in_group,
351 const goby::middleware::Group& line_out_group,
353 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
354 template <class> class ThreadType, bool use_indexed_groups>
355void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
356 subscribe_layer, IOConfig, SocketType, ThreadType,
357 use_indexed_groups>::loop()
358{
359 if (socket_ && socket_->is_open())
360 {
361 // run the io service (blocks until either we read something
362 // from the socket or a subscription is available
363 // as signaled from an empty handler in the incoming_mail_notify_thread)
364 io_.run_one();
365 }
366 else
367 {
368 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
369 if (now > next_open_attempt_)
370 {
371 try_open();
372 }
373 else
374 {
375 // poll in case we need to quit
376 io_.poll();
377 usleep(100000); // avoid pegging CPU while waiting to attempt reopening socket
378 }
379 }
380}
381
382template <const goby::middleware::Group& line_in_group,
383 const goby::middleware::Group& line_out_group,
385 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
386 template <class> class ThreadType, bool use_indexed_groups>
388 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
389 use_indexed_groups>::handle_read_error(const boost::system::error_code& ec)
390{
391 auto status = std::make_shared<protobuf::IOStatus>();
392 if (this->index() != -1)
393 status->set_index(this->index());
394
395 status->set_state(protobuf::IO__CRITICAL_FAILURE);
396 goby::middleware::protobuf::IOError& error = *status->mutable_error();
398 error.set_text(ec.message());
399 this->publish_in(status);
400
401 goby::glog.is_warn() && goby::glog << group(glog_group_)
402 << "Failed to read from the socket/serial_port: "
403 << error.ShortDebugString() << std::endl;
404
405 socket_.reset();
406}
407
408template <const goby::middleware::Group& line_in_group,
409 const goby::middleware::Group& line_out_group,
411 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
412 template <class> class ThreadType, bool use_indexed_groups>
414 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
415 use_indexed_groups>::handle_write_error(const boost::system::error_code& ec)
416{
417 auto status = std::make_shared<protobuf::IOStatus>();
418 if (this->index() != -1)
419 status->set_index(this->index());
420
421 status->set_state(protobuf::IO__CRITICAL_FAILURE);
422 goby::middleware::protobuf::IOError& error = *status->mutable_error();
424 error.set_text(ec.message());
425 this->publish_in(status);
426
427 goby::glog.is_warn() && goby::glog << group(glog_group_)
428 << "Failed to write to the socket/serial_port: "
429 << error.ShortDebugString() << std::endl;
430 socket_.reset();
431}
432
433#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
void write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
void handle_read_success(std::size_t bytes_transferred, const std::string &bytes)
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)=0
Starts an asynchronous write from data published.
bool socket_is_open()
Does the socket exist and is it open?
void handle_read_error(const boost::system::error_code &ec)
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
void handle_write_error(const boost::system::error_code &ec)
virtual void async_read()=0
Starts an asynchronous read on the socket.
IOThread(const IOConfig &config, int index, std::string glog_group="i/o")
Constructs the thread.
SocketType & mutable_socket()
Access the (mutable) socket (or serial_port) object.
virtual void open_socket()=0
Opens the newly created socket/serial_port.
friend void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
boost::asio::io_context & mutable_io()
void handle_write_success(std::size_t bytes_transferred)
static constexpr ErrorCode IO__INIT_FAILURE
Definition io.pb.h:1933
static constexpr ErrorCode IO__WRITE_FAILURE
Definition io.pb.h:1937
static constexpr ErrorCode IO__READ_FAILURE
Definition io.pb.h:1935
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
Definition json.hpp:247
@ error
throw a parse_error exception in case of a tag
void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
ProtobufEndpoint endpoint_convert(const ASIOEndpoint &asio_ep)
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition common.h:44
constexpr T e
Definition constants.h:35
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
STL namespace.
std::chrono::time_point< SteadyClock > time_point
static time_point now() noexcept
Returns the current steady time unless SimulatorSettings::using_sim_time == true in which case a simu...
std::chrono::microseconds duration
Duration type.