Goby3 3.5.1
2026.06.04
Loading...
Searching...
No Matches
io_interface.h
Go to the documentation of this file.
1// Copyright 2019-2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6// Copilot <198982749+Copilot@users.noreply.github.com>
7//
8//
9// This file is part of the Goby Underwater Autonomy Project Libraries
10// ("The Goby Libraries").
11//
12// The Goby Libraries are free software: you can redistribute them and/or modify
13// them under the terms of the GNU Lesser General Public License as published by
14// the Free Software Foundation, either version 2.1 of the License, or
15// (at your option) any later version.
16//
17// The Goby Libraries are distributed in the hope that they will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public License
23// along with Goby. If not, see <http://www.gnu.org/licenses/>.
24
25#ifndef GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
26#define GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
27
28#include <chrono> // for seconds
29#include <exception> // for exception
30#include <memory> // for shared_ptr
31#include <mutex> // for mutex, lock_...
32#include <ostream> // for endl, size_t
33#include <string> // for string, oper...
34#include <thread> // for thread
35#include <unistd.h> // for usleep
36
37#include <boost/asio/write.hpp> // for async_write
38#include <boost/asio/post.hpp>
39#include <boost/system/error_code.hpp> // for error_code
40
41#include "goby/exception.h" // for Exception
42#include "goby/middleware/application/multi_thread.h" // for SimpleThread
43#include "goby/middleware/common.h" // for thread_id
44#include "goby/middleware/io/groups.h" // for status
45#include "goby/middleware/protobuf/io.pb.h" // for IOError, IOS...
46#include "goby/time/steady_clock.h" // for SteadyClock
48#include "goby/util/debug_logger.h" // for glog
49
50#include "io_transporters.h"
51
52namespace goby
53{
54namespace middleware
55{
56class Group;
57class InterThreadTransporter;
58template <typename InnerTransporter, typename ImplementationTag> class InterProcessForwarder;
59namespace io
60{
61enum class ThreadState
62{
64};
65
66namespace detail
67{
68template <typename ProtobufEndpoint, typename ASIOEndpoint>
69ProtobufEndpoint endpoint_convert(const ASIOEndpoint& asio_ep)
70{
71 ProtobufEndpoint pb_ep;
72 pb_ep.set_addr(asio_ep.address().to_string());
73 pb_ep.set_port(asio_ep.port());
74 return pb_ep;
75}
76
77template <const goby::middleware::Group& line_in_group,
78 const goby::middleware::Group& line_out_group, PubSubLayer publish_layer,
79 PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
80 template <class> class ThreadType, bool use_indexed_groups = false>
82 : public ThreadType<IOConfig>,
84 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
85 SocketType, ThreadType, use_indexed_groups>,
86 line_in_group, publish_layer,
87 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>,
89 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
90 SocketType, ThreadType, use_indexed_groups>,
91 line_out_group, subscribe_layer,
92 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>
93{
94 public:
99 IOThread(const IOConfig& config, int index, std::string glog_group = "i/o")
100 : ThreadType<IOConfig>(config, this->loop_max_frequency(), index),
102 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
103 SocketType, ThreadType, use_indexed_groups>,
104 line_in_group, publish_layer,
105 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>(
106 index),
108 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
109 SocketType, ThreadType, use_indexed_groups>,
110 line_out_group, subscribe_layer,
111 typename ThreadType<IOConfig>::Transporter::implementation_tag, use_indexed_groups>(
112 index),
113 glog_group_(glog_group + " / t" + std::to_string(goby::middleware::gettid())),
114 thread_name_(glog_group)
115 {
116 auto data_out_callback =
117 [this](std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
118 {
119 if (!io_msg->has_index() || io_msg->index() == this->index())
120 {
121 write(io_msg);
122 }
123 };
124
125 this->template subscribe_out<goby::middleware::protobuf::IOData>(data_out_callback);
126
127 if (!glog_group_added_)
128 {
130 glog_group_added_ = true;
131 }
132 }
133
134 void initialize() override
135 {
136 // thread to handle synchonization between boost::asio and goby condition_variable signaling
137 incoming_mail_notify_thread_.reset(new std::thread(
138 [this]()
139 {
140 while (this->alive())
141 {
142 std::unique_lock<std::mutex> lock(incoming_mail_notify_mutex_);
143 this->interthread().cv()->wait(lock);
144 // post empty handler to cause loop() to return and allow incoming mail to be handled
145 boost::asio::post(io_, []() {});
146 }
147 }));
148
149 this->set_name(thread_name_);
150 }
151
152 void finalize() override
153 {
154 // join incoming mail thread
155 {
156 std::lock_guard<std::mutex> l(incoming_mail_notify_mutex_);
157 this->interthread().cv()->notify_all();
158 }
159 incoming_mail_notify_thread_->join();
160 incoming_mail_notify_thread_.reset();
161 }
162
163 virtual ~IOThread()
164 {
165 socket_.reset();
166
167 // for non clean shutdown, avoid abort
168 if (incoming_mail_notify_thread_)
169 incoming_mail_notify_thread_->detach();
170
171 auto status = std::make_shared<protobuf::IOStatus>();
172 status->set_state(protobuf::IO__LINK_CLOSED);
173
174 this->publish_in(status);
175 this->template unsubscribe_out<goby::middleware::protobuf::IOData>();
176 }
177
178 template <class IOThreadImplementation>
179 friend void basic_async_write(IOThreadImplementation* this_thread,
180 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg);
181
182 protected:
183 void write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
184 {
186 goby::glog << group(glog_group_) << "(" << io_msg->data().size() << "B) <"
187 << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
188 << " " << io_msg->ShortDebugString() << std::endl;
189 if (io_msg->data().empty())
190 return;
191 if (!socket_ || !socket_->is_open())
192 return;
193
194 this->async_write(io_msg);
195 }
196
197 void handle_read_success(std::size_t bytes_transferred, const std::string& bytes)
198 {
199 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
200 *io_msg->mutable_data() = bytes;
201
202 handle_read_success(bytes_transferred, io_msg);
203 }
204
205 void handle_read_success(std::size_t bytes_transferred,
206 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
207 {
208 if (this->index() != -1)
209 io_msg->set_index(this->index());
210
212 goby::glog << group(glog_group_) << "(" << bytes_transferred << "B) >"
213 << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
214 << " " << io_msg->ShortDebugString() << std::endl;
215
216 this->publish_in(io_msg);
217 }
218
219 void handle_write_success(std::size_t bytes_transferred) {}
220 void handle_read_error(const boost::system::error_code& ec);
221 void handle_write_error(const boost::system::error_code& ec);
222
224 SocketType& mutable_socket()
225 {
226 if (socket_)
227 return *socket_;
228 else
229 throw goby::Exception("Attempted to access null socket/serial_port");
230 }
231
233
235 bool socket_is_open() { return socket_ && socket_->is_open(); }
236
238 virtual void open_socket() = 0;
239
241 virtual void async_read() = 0;
242
244 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
245
246 const std::string& glog_group() { return glog_group_; }
247
248 private:
250 void try_open();
251
253 void loop() override;
254
255 private:
257 std::unique_ptr<SocketType> socket_;
258
259 const goby::time::SteadyClock::duration min_backoff_interval_{std::chrono::seconds(1)};
260 const goby::time::SteadyClock::duration max_backoff_interval_{std::chrono::seconds(128)};
261 goby::time::SteadyClock::duration backoff_interval_{min_backoff_interval_};
263
264 std::mutex incoming_mail_notify_mutex_;
265 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
266
267 std::string glog_group_;
268 std::string thread_name_;
269 bool glog_group_added_{false};
270};
271
272template <class IOThreadImplementation>
273void basic_async_write(IOThreadImplementation* this_thread,
274 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
275{
276 boost::asio::async_write(
277 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
278 // capture io_msg in callback to ensure write buffer exists until async_write is done
279 [this_thread, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred)
280 {
281 if (!ec && bytes_transferred > 0)
282 {
283 this_thread->handle_write_success(bytes_transferred);
284 }
285 else
286 {
287 this_thread->handle_write_error(ec);
288 }
289 });
290}
291
292} // namespace detail
293} // namespace io
294} // namespace middleware
295} // namespace goby
296
297template <const goby::middleware::Group& line_in_group,
298 const goby::middleware::Group& line_out_group,
300 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
301 template <class> class ThreadType, bool use_indexed_groups>
302void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
303 subscribe_layer, IOConfig, SocketType, ThreadType,
304 use_indexed_groups>::try_open()
305{
306 try
307 {
308 socket_.reset(new SocketType(io_));
309 open_socket();
310
311 // messages read from the socket
312 this->async_read();
313
314 // reset io_context, which ran out of work
315 io_.restart();
316
317 // successful, reset backoff
318 backoff_interval_ = min_backoff_interval_;
319
320 auto status = std::make_shared<protobuf::IOStatus>();
321 if (this->index() != -1)
322 status->set_index(this->index());
323
324 status->set_state(protobuf::IO__LINK_OPEN);
325 this->publish_in(status);
326
327 goby::glog.is_debug2() && goby::glog << group(glog_group_) << "Successfully opened socket"
328 << std::endl;
329
330 // update to avoid thrashing on open success but read/write failure
331 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
332 next_open_attempt_ = now + backoff_interval_;
333 }
334 catch (const std::exception& e)
335 {
336 auto status = std::make_shared<protobuf::IOStatus>();
337 if (this->index() != -1)
338 status->set_index(this->index());
339
340 status->set_state(protobuf::IO__CRITICAL_FAILURE);
343 error.set_text(e.what() + std::string(": config (") + this->cfg().ShortDebugString() + ")");
344 this->publish_in(status);
345
346 goby::glog.is_warn() && goby::glog << group(glog_group_)
347 << "Failed to open/configure socket/serial_port: "
348 << error.ShortDebugString() << std::endl;
349
350 if (backoff_interval_ < max_backoff_interval_)
351 backoff_interval_ *= 2.0;
352
353 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
354 next_open_attempt_ = now + backoff_interval_;
355
356 goby::glog.is_warn() && goby::glog << group(glog_group_) << "Will retry in "
357 << backoff_interval_ / std::chrono::seconds(1)
358 << " seconds" << std::endl;
359 socket_.reset();
360 }
361}
362
363template <const goby::middleware::Group& line_in_group,
364 const goby::middleware::Group& line_out_group,
366 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
367 template <class> class ThreadType, bool use_indexed_groups>
368void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
369 subscribe_layer, IOConfig, SocketType, ThreadType,
370 use_indexed_groups>::loop()
371{
372 if (socket_ && socket_->is_open())
373 {
374 // run the io service (blocks until either we read something
375 // from the socket or a subscription is available
376 // as signaled from an empty handler in the incoming_mail_notify_thread)
377 io_.run_one();
378 }
379 else
380 {
381 decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
382 if (now > next_open_attempt_)
383 {
384 try_open();
385 }
386 else
387 {
388 // poll in case we need to quit
389 io_.poll();
390 usleep(100000); // avoid pegging CPU while waiting to attempt reopening socket
391 }
392 }
393}
394
395template <const goby::middleware::Group& line_in_group,
396 const goby::middleware::Group& line_out_group,
398 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
399 template <class> class ThreadType, bool use_indexed_groups>
401 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
402 use_indexed_groups>::handle_read_error(const boost::system::error_code& ec)
403{
404 auto status = std::make_shared<protobuf::IOStatus>();
405 if (this->index() != -1)
406 status->set_index(this->index());
407
408 status->set_state(protobuf::IO__CRITICAL_FAILURE);
409 goby::middleware::protobuf::IOError& error = *status->mutable_error();
411 error.set_text(ec.message());
412 this->publish_in(status);
413
414 goby::glog.is_warn() && goby::glog << group(glog_group_)
415 << "Failed to read from the socket/serial_port: "
416 << error.ShortDebugString() << std::endl;
417
418 socket_.reset();
419}
420
421template <const goby::middleware::Group& line_in_group,
422 const goby::middleware::Group& line_out_group,
424 goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
425 template <class> class ThreadType, bool use_indexed_groups>
427 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
428 use_indexed_groups>::handle_write_error(const boost::system::error_code& ec)
429{
430 auto status = std::make_shared<protobuf::IOStatus>();
431 if (this->index() != -1)
432 status->set_index(this->index());
433
434 status->set_state(protobuf::IO__CRITICAL_FAILURE);
435 goby::middleware::protobuf::IOError& error = *status->mutable_error();
437 error.set_text(ec.message());
438 this->publish_in(status);
439
440 goby::glog.is_warn() && goby::glog << group(glog_group_)
441 << "Failed to write to the socket/serial_port: "
442 << error.ShortDebugString() << std::endl;
443 socket_.reset();
444}
445
446#endif
simple exception class for goby applications
Definition exception.h:35
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:61
void write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
void handle_read_success(std::size_t bytes_transferred, const std::string &bytes)
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)=0
Starts an asynchronous write from data published.
bool socket_is_open()
Does the socket exist and is it open?
void handle_read_error(const boost::system::error_code &ec)
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
void handle_write_error(const boost::system::error_code &ec)
virtual void async_read()=0
Starts an asynchronous read on the socket.
IOThread(const IOConfig &config, int index, std::string glog_group="i/o")
Constructs the thread.
SocketType & mutable_socket()
Access the (mutable) socket (or serial_port) object.
virtual void open_socket()=0
Opens the newly created socket/serial_port.
friend void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
boost::asio::io_context & mutable_io()
void handle_write_success(std::size_t bytes_transferred)
static constexpr ErrorCode IO__INIT_FAILURE
Definition io.pb.h:1933
static constexpr ErrorCode IO__WRITE_FAILURE
Definition io.pb.h:1937
static constexpr ErrorCode IO__READ_FAILURE
Definition io.pb.h:1935
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
Definition json.hpp:263
@ error
throw a parse_error exception in case of a tag
void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
ProtobufEndpoint endpoint_convert(const ASIOEndpoint &asio_ep)
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition common.h:44
middleware::InterProcessForwarder< InnerTransporter, detail::InterProcessTag > InterProcessForwarder
constexpr T e
Definition constants.h:35
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
uint32_t index(const std::array< int8_t, 256 > &rdata, char symbol)
Definition base.h:140
STL namespace.
std::chrono::time_point< SteadyClock > time_point
static time_point now() noexcept
Returns the current steady time unless SimulatorSettings::using_sim_time == true in which case a simu...
std::chrono::microseconds duration
Duration type.