Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
io_interface.h
Go to the documentation of this file.
1// Copyright 2019-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
25#define GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
26
27#include <chrono> // for seconds
28#include <exception> // for exception
29#include <memory> // for shared_ptr
30#include <mutex> // for mutex, lock_...
31#include <ostream> // for endl, size_t
32#include <string> // for string, oper...
33#include <thread> // for thread
34#include <unistd.h> // for usleep
35
36#include <boost/asio/write.hpp> // for async_write
37#include <boost/system/error_code.hpp> // for error_code
38
39#include "goby/exception.h" // for Exception
40#include "goby/middleware/application/multi_thread.h" // for SimpleThread
41#include "goby/middleware/common.h" // for thread_id
42#include "goby/middleware/io/groups.h" // for status
43#include "goby/middleware/protobuf/io.pb.h" // for IOError, IOS...
44#include "goby/time/steady_clock.h" // for SteadyClock
46#include "goby/util/debug_logger.h" // for glog
47
48#include "io_transporters.h"
49
50namespace goby
51{
52namespace middleware
53{
54class Group;
55class InterThreadTransporter;
56template <typename InnerTransporter, typename ImplementationTag> class InterProcessForwarder;
57namespace io
58{
59enum class ThreadState
60{
62};
63
64namespace detail
65{
66template <typename ProtobufEndpoint, typename ASIOEndpoint>
67ProtobufEndpoint endpoint_convert(const ASIOEndpoint& asio_ep)
68{
69 ProtobufEndpoint pb_ep;
70 pb_ep.set_addr(asio_ep.address().to_string());
71 pb_ep.set_port(asio_ep.port());
72 return pb_ep;
73}
74
75template <const goby::middleware::Group& line_in_group,
76 const goby::middleware::Group& line_out_group, PubSubLayer publish_layer,
77 PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
78 template <class> class ThreadType, bool use_indexed_groups = false>
80 : public ThreadType<IOConfig>,
82 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
83 SocketType, ThreadType, use_indexed_groups>,
84 line_in_group, publish_layer,
85 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>,
87 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
88 SocketType, ThreadType, use_indexed_groups>,
89 line_out_group, subscribe_layer,
90 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>
91{
92 public:
97 IOThread(const IOConfig& config, int index, std::string glog_group = "i/o")
98 : ThreadType<IOConfig>(config, this->loop_max_frequency(), index),
100 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
101 SocketType, ThreadType, use_indexed_groups>,
102 line_in_group, publish_layer,
103 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>(
104 index),
106 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
107 SocketType, ThreadType, use_indexed_groups>,
108 line_out_group, subscribe_layer,
109 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>(
110 index),
111 glog_group_(glog_group + " / t" + std::to_string(goby::middleware::gettid())),
112 thread_name_(glog_group)
113 {
114 auto data_out_callback =
115 [this](std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
116 {
117 if (!io_msg->has_index() || io_msg->index() == this->index())
118 {
119 write(io_msg);
120 }
121 };
122
123 this->template subscribe_out<goby::middleware::protobuf::IOData>(data_out_callback);
124
125 if (!glog_group_added_)
126 {
128 glog_group_added_ = true;
129 }
130 }
131
132 void initialize() override
133 {
134 // thread to handle synchonization between boost::asio and goby condition_variable signaling
135 incoming_mail_notify_thread_.reset(new std::thread(
136 [this]()
137 {
138 while (this->alive())
139 {
140 std::unique_lock<std::mutex> lock(incoming_mail_notify_mutex_);
141 this->interthread().cv()->wait(lock);
142 // post empty handler to cause loop() to return and allow incoming mail to be handled
143 io_.post([]() {});
144 }
145 }));
146
147 this->set_name(thread_name_);
148 }
149
150 void finalize() override
151 {
152 // join incoming mail thread
153 {
154 std::lock_guard<std::mutex> l(incoming_mail_notify_mutex_);
155 this->interthread().cv()->notify_all();
156 }
157 incoming_mail_notify_thread_->join();
158 incoming_mail_notify_thread_.reset();
159 }
160
161 virtual ~IOThread()
162 {
163 socket_.reset();
164
165 // for non clean shutdown, avoid abort
166 if (incoming_mail_notify_thread_)
167 incoming_mail_notify_thread_->detach();
168
169 auto status = std::make_shared<protobuf::IOStatus>();
170 status->set_state(protobuf::IO__LINK_CLOSED);
171
172 this->publish_in(status);
173 this->template unsubscribe_out<goby::middleware::protobuf::IOData>();
174 }
175
176 template <class IOThreadImplementation>
177 friend void basic_async_write(IOThreadImplementation* this_thread,
178 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg);
179
180 protected:
181 void write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
182 {
184 goby::glog << group(glog_group_) << "(" << io_msg->data().size() << "B) <"
185 << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
186 << " " << io_msg->ShortDebugString() << std::endl;
187 if (io_msg->data().empty())
188 return;
189 if (!socket_ || !socket_->is_open())
190 return;
191
192 this->async_write(io_msg);
193 }
194
195 void handle_read_success(std::size_t bytes_transferred, const std::string& bytes)
196 {
197 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
198 *io_msg->mutable_data() = bytes;
199
200 handle_read_success(bytes_transferred, io_msg);
201 }
202
203 void handle_read_success(std::size_t bytes_transferred,
204 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
205 {
206 if (this->index() != -1)
207 io_msg->set_index(this->index());
208
210 goby::glog << group(glog_group_) << "(" << bytes_transferred << "B) >"
211 << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
212 << " " << io_msg->ShortDebugString() << std::endl;
213
214 this->publish_in(io_msg);
215 }
216
217 void handle_write_success(std::size_t bytes_transferred) {}
218 void handle_read_error(const boost::system::error_code& ec);
219 void handle_write_error(const boost::system::error_code& ec);
220
222 SocketType& mutable_socket()
223 {
224 if (socket_)
225 return *socket_;
226 else
227 throw goby::Exception("Attempted to access null socket/serial_port");
228 }
229
231
233 bool socket_is_open() { return socket_ && socket_->is_open(); }
234
236 virtual void open_socket() = 0;
237
239 virtual void async_read() = 0;
240
242 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
243
244 const std::string& glog_group() { return glog_group_; }
245
246 private:
248 void try_open();
249
251 void loop() override;
252
253 private:
255 std::unique_ptr<SocketType> socket_;
256
257 const goby::time::SteadyClock::duration min_backoff_interval_{std::chrono::seconds(1)};
258 const goby::time::SteadyClock::duration max_backoff_interval_{std::chrono::seconds(128)};
259 goby::time::SteadyClock::duration backoff_interval_{min_backoff_interval_};
261
262 std::mutex incoming_mail_notify_mutex_;
263 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
264
265 std::string glog_group_;
266 std::string thread_name_;
267 bool glog_group_added_{false};
268};
269
270template <class IOThreadImplementation>
271void basic_async_write(IOThreadImplementation* this_thread,
272 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
273{
274 boost::asio::async_write(
275 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
276 // capture io_msg in callback to ensure write buffer exists until async_write is done
277 [this_thread, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred)
278 {
279 if (!ec && bytes_transferred > 0)
280 {
281 this_thread->handle_write_success(bytes_transferred);
282 }
283 else
284 {
285 this_thread->handle_write_error(ec);
286 }
287 });
288}
289
290} // namespace detail
291} // namespace io
292} // namespace middleware
293} // namespace goby
294
295template <const goby::middleware::Group& line_in_group,
296 const goby::middleware::Group& line_out_group,
298 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
299 template <class> class ThreadType, bool use_indexed_groups>
300void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
301 subscribe_layer, IOConfig, SocketType, ThreadType,
302 use_indexed_groups>::try_open()
303{
304 try
305 {
306 socket_.reset(new SocketType(io_));
307 open_socket();
308
309 // messages read from the socket
310 this->async_read();
311
312 // reset io_context, which ran out of work
313 io_.reset();
314
315 // successful, reset backoff
316 backoff_interval_ = min_backoff_interval_;
317
318 auto status = std::make_shared<protobuf::IOStatus>();
319 if (this->index() != -1)
320 status->set_index(this->index());
321
322 status->set_state(protobuf::IO__LINK_OPEN);
323 this->publish_in(status);
324
325 goby::glog.is_debug2() && goby::glog << group(glog_group_) << "Successfully opened socket"
326 << std::endl;
327
328 // update to avoid thrashing on open success but read/write failure
329 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
330 next_open_attempt_ = now + backoff_interval_;
331 }
332 catch (const std::exception& e)
333 {
334 auto status = std::make_shared<protobuf::IOStatus>();
335 if (this->index() != -1)
336 status->set_index(this->index());
337
338 status->set_state(protobuf::IO__CRITICAL_FAILURE);
341 error.set_text(e.what() + std::string(": config (") + this->cfg().ShortDebugString() + ")");
342 this->publish_in(status);
343
344 goby::glog.is_warn() && goby::glog << group(glog_group_)
345 << "Failed to open/configure socket/serial_port: "
346 << error.ShortDebugString() << std::endl;
347
348 if (backoff_interval_ < max_backoff_interval_)
349 backoff_interval_ *= 2.0;
350
351 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
352 next_open_attempt_ = now + backoff_interval_;
353
354 goby::glog.is_warn() && goby::glog << group(glog_group_) << "Will retry in "
355 << backoff_interval_ / std::chrono::seconds(1)
356 << " seconds" << std::endl;
357 socket_.reset();
358 }
359}
360
361template <const goby::middleware::Group& line_in_group,
362 const goby::middleware::Group& line_out_group,
364 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
365 template <class> class ThreadType, bool use_indexed_groups>
366void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
367 subscribe_layer, IOConfig, SocketType, ThreadType,
368 use_indexed_groups>::loop()
369{
370 if (socket_ && socket_->is_open())
371 {
372 // run the io service (blocks until either we read something
373 // from the socket or a subscription is available
374 // as signaled from an empty handler in the incoming_mail_notify_thread)
375 io_.run_one();
376 }
377 else
378 {
379 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
380 if (now > next_open_attempt_)
381 {
382 try_open();
383 }
384 else
385 {
386 // poll in case we need to quit
387 io_.poll();
388 usleep(100000); // avoid pegging CPU while waiting to attempt reopening socket
389 }
390 }
391}
392
393template <const goby::middleware::Group& line_in_group,
394 const goby::middleware::Group& line_out_group,
396 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
397 template <class> class ThreadType, bool use_indexed_groups>
399 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
400 use_indexed_groups>::handle_read_error(const boost::system::error_code& ec)
401{
402 auto status = std::make_shared<protobuf::IOStatus>();
403 if (this->index() != -1)
404 status->set_index(this->index());
405
406 status->set_state(protobuf::IO__CRITICAL_FAILURE);
407 goby::middleware::protobuf::IOError& error = *status->mutable_error();
409 error.set_text(ec.message());
410 this->publish_in(status);
411
412 goby::glog.is_warn() && goby::glog << group(glog_group_)
413 << "Failed to read from the socket/serial_port: "
414 << error.ShortDebugString() << std::endl;
415
416 socket_.reset();
417}
418
419template <const goby::middleware::Group& line_in_group,
420 const goby::middleware::Group& line_out_group,
422 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
423 template <class> class ThreadType, bool use_indexed_groups>
425 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
426 use_indexed_groups>::handle_write_error(const boost::system::error_code& ec)
427{
428 auto status = std::make_shared<protobuf::IOStatus>();
429 if (this->index() != -1)
430 status->set_index(this->index());
431
432 status->set_state(protobuf::IO__CRITICAL_FAILURE);
433 goby::middleware::protobuf::IOError& error = *status->mutable_error();
435 error.set_text(ec.message());
436 this->publish_in(status);
437
438 goby::glog.is_warn() && goby::glog << group(glog_group_)
439 << "Failed to write to the socket/serial_port: "
440 << error.ShortDebugString() << std::endl;
441 socket_.reset();
442}
443
444#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
void write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
void handle_read_success(std::size_t bytes_transferred, const std::string &bytes)
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)=0
Starts an asynchronous write from data published.
bool socket_is_open()
Does the socket exist and is it open?
void handle_read_error(const boost::system::error_code &ec)
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
void handle_write_error(const boost::system::error_code &ec)
virtual void async_read()=0
Starts an asynchronous read on the socket.
IOThread(const IOConfig &config, int index, std::string glog_group="i/o")
Constructs the thread.
SocketType & mutable_socket()
Access the (mutable) socket (or serial_port) object.
virtual void open_socket()=0
Opens the newly created socket/serial_port.
friend void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
boost::asio::io_context & mutable_io()
void handle_write_success(std::size_t bytes_transferred)
static constexpr ErrorCode IO__INIT_FAILURE
Definition io.pb.h:1933
static constexpr ErrorCode IO__WRITE_FAILURE
Definition io.pb.h:1937
static constexpr ErrorCode IO__READ_FAILURE
Definition io.pb.h:1935
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
Definition json.hpp:247
@ error
throw a parse_error exception in case of a tag
void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
ProtobufEndpoint endpoint_convert(const ASIOEndpoint &asio_ep)
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition common.h:44
middleware::InterProcessForwarder< InnerTransporter, detail::InterProcessTag > InterProcessForwarder
constexpr T e
Definition constants.h:35
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
STL namespace.
std::chrono::time_point< SteadyClock > time_point
static time_point now() noexcept
Returns the current steady time unless SimulatorSettings::using_sim_time == true in which case a simu...
std::chrono::microseconds duration
Duration type.