242 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
251 void loop()
override;
255 std::unique_ptr<SocketType> socket_;
262 std::mutex incoming_mail_notify_mutex_;
263 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
265 std::string glog_group_;
266 std::string thread_name_;
267 bool glog_group_added_{
false};
270template <
class IOThreadImplementation>
272 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
274 boost::asio::async_write(
275 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
277 [this_thread, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred)
279 if (!ec && bytes_transferred > 0)
281 this_thread->handle_write_success(bytes_transferred);
285 this_thread->handle_write_error(ec);
299 template <
class>
class ThreadType,
bool use_indexed_groups>
301 subscribe_layer, IOConfig, SocketType, ThreadType,
302 use_indexed_groups>::try_open()
306 socket_.reset(
new SocketType(io_));
316 backoff_interval_ = min_backoff_interval_;
318 auto status = std::make_shared<protobuf::IOStatus>();
319 if (this->index() != -1)
320 status->set_index(this->index());
322 status->set_state(protobuf::IO__LINK_OPEN);
323 this->publish_in(status);
330 next_open_attempt_ = now + backoff_interval_;
332 catch (
const std::exception& e)
334 auto status = std::make_shared<protobuf::IOStatus>();
335 if (this->index() != -1)
336 status->set_index(this->index());
338 status->set_state(protobuf::IO__CRITICAL_FAILURE);
341 error.set_text(
e.what() + std::string(
": config (") + this->cfg().ShortDebugString() +
")");
342 this->publish_in(status);
345 <<
"Failed to open/configure socket/serial_port: "
346 <<
error.ShortDebugString() << std::endl;
348 if (backoff_interval_ < max_backoff_interval_)
349 backoff_interval_ *= 2.0;
352 next_open_attempt_ = now + backoff_interval_;
355 << backoff_interval_ / std::chrono::seconds(1)
356 <<
" seconds" << std::endl;
365 template <
class>
class ThreadType,
bool use_indexed_groups>
367 subscribe_layer, IOConfig, SocketType, ThreadType,
368 use_indexed_groups>::loop()
370 if (socket_ && socket_->is_open())
380 if (now > next_open_attempt_)
397 template <
class>
class ThreadType,
bool use_indexed_groups>
399 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
400 use_indexed_groups>::handle_read_error(
const boost::system::error_code& ec)
402 auto status = std::make_shared<protobuf::IOStatus>();
403 if (this->index() != -1)
404 status->set_index(this->index());
406 status->set_state(protobuf::IO__CRITICAL_FAILURE);
409 error.set_text(ec.message());
410 this->publish_in(status);
413 <<
"Failed to read from the socket/serial_port: "
414 << error.ShortDebugString() << std::endl;
423 template <
class>
class ThreadType,
bool use_indexed_groups>
425 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
426 use_indexed_groups>::handle_write_error(
const boost::system::error_code& ec)
428 auto status = std::make_shared<protobuf::IOStatus>();
429 if (this->index() != -1)
430 status->set_index(this->index());
432 status->set_state(protobuf::IO__CRITICAL_FAILURE);
435 error.set_text(ec.message());
436 this->publish_in(status);
439 <<
"Failed to write to the socket/serial_port: "
440 << error.ShortDebugString() << std::endl;