244 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
253 void loop()
override;
257 std::unique_ptr<SocketType> socket_;
264 std::mutex incoming_mail_notify_mutex_;
265 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
267 std::string glog_group_;
268 std::string thread_name_;
269 bool glog_group_added_{
false};
272template <
class IOThreadImplementation>
274 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
276 boost::asio::async_write(
277 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
279 [this_thread, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred)
281 if (!ec && bytes_transferred > 0)
283 this_thread->handle_write_success(bytes_transferred);
287 this_thread->handle_write_error(ec);
301 template <
class>
class ThreadType,
bool use_indexed_groups>
303 subscribe_layer, IOConfig, SocketType, ThreadType,
304 use_indexed_groups>::try_open()
308 socket_.reset(
new SocketType(io_));
318 backoff_interval_ = min_backoff_interval_;
320 auto status = std::make_shared<protobuf::IOStatus>();
321 if (this->
index() != -1)
324 status->set_state(protobuf::IO__LINK_OPEN);
325 this->publish_in(status);
332 next_open_attempt_ = now + backoff_interval_;
334 catch (
const std::exception& e)
336 auto status = std::make_shared<protobuf::IOStatus>();
337 if (this->
index() != -1)
340 status->set_state(protobuf::IO__CRITICAL_FAILURE);
343 error.set_text(
e.what() + std::string(
": config (") + this->cfg().ShortDebugString() +
")");
344 this->publish_in(status);
347 <<
"Failed to open/configure socket/serial_port: "
348 <<
error.ShortDebugString() << std::endl;
350 if (backoff_interval_ < max_backoff_interval_)
351 backoff_interval_ *= 2.0;
354 next_open_attempt_ = now + backoff_interval_;
357 << backoff_interval_ / std::chrono::seconds(1)
358 <<
" seconds" << std::endl;
367 template <
class>
class ThreadType,
bool use_indexed_groups>
369 subscribe_layer, IOConfig, SocketType, ThreadType,
370 use_indexed_groups>::loop()
372 if (socket_ && socket_->is_open())
382 if (now > next_open_attempt_)
399 template <
class>
class ThreadType,
bool use_indexed_groups>
401 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
402 use_indexed_groups>::handle_read_error(
const boost::system::error_code& ec)
404 auto status = std::make_shared<protobuf::IOStatus>();
405 if (this->index() != -1)
406 status->set_index(this->index());
408 status->set_state(protobuf::IO__CRITICAL_FAILURE);
411 error.set_text(ec.message());
412 this->publish_in(status);
415 <<
"Failed to read from the socket/serial_port: "
416 << error.ShortDebugString() << std::endl;
425 template <
class>
class ThreadType,
bool use_indexed_groups>
427 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
428 use_indexed_groups>::handle_write_error(
const boost::system::error_code& ec)
430 auto status = std::make_shared<protobuf::IOStatus>();
431 if (this->index() != -1)
432 status->set_index(this->index());
434 status->set_state(protobuf::IO__CRITICAL_FAILURE);
437 error.set_text(ec.message());
438 this->publish_in(status);
441 <<
"Failed to write to the socket/serial_port: "
442 << error.ShortDebugString() << std::endl;