232 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
241 void loop()
override;
245 std::unique_ptr<SocketType> socket_;
252 std::mutex incoming_mail_notify_mutex_;
253 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
255 std::string glog_group_;
256 std::string thread_name_;
257 bool glog_group_added_{
false};
260template <
class IOThreadImplementation>
262 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
264 boost::asio::async_write(
265 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
267 [this_thread, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred) {
268 if (!ec && bytes_transferred > 0)
270 this_thread->handle_write_success(bytes_transferred);
274 this_thread->handle_write_error(ec);
288 template <
class>
class ThreadType,
bool use_indexed_groups>
290 subscribe_layer, IOConfig, SocketType, ThreadType,
291 use_indexed_groups>::try_open()
295 socket_.reset(
new SocketType(io_));
305 backoff_interval_ = min_backoff_interval_;
307 auto status = std::make_shared<protobuf::IOStatus>();
308 if (this->index() != -1)
309 status->set_index(this->index());
311 status->set_state(protobuf::IO__LINK_OPEN);
312 this->publish_in(status);
319 next_open_attempt_ = now + backoff_interval_;
321 catch (
const std::exception& e)
323 auto status = std::make_shared<protobuf::IOStatus>();
324 if (this->index() != -1)
325 status->set_index(this->index());
327 status->set_state(protobuf::IO__CRITICAL_FAILURE);
330 error.set_text(
e.what() + std::string(
": config (") + this->cfg().ShortDebugString() +
")");
331 this->publish_in(status);
334 <<
"Failed to open/configure socket/serial_port: "
335 <<
error.ShortDebugString() << std::endl;
337 if (backoff_interval_ < max_backoff_interval_)
338 backoff_interval_ *= 2.0;
341 next_open_attempt_ = now + backoff_interval_;
344 << backoff_interval_ / std::chrono::seconds(1)
345 <<
" seconds" << std::endl;
354 template <
class>
class ThreadType,
bool use_indexed_groups>
356 subscribe_layer, IOConfig, SocketType, ThreadType,
357 use_indexed_groups>::loop()
359 if (socket_ && socket_->is_open())
369 if (now > next_open_attempt_)
386 template <
class>
class ThreadType,
bool use_indexed_groups>
388 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
389 use_indexed_groups>::handle_read_error(
const boost::system::error_code& ec)
391 auto status = std::make_shared<protobuf::IOStatus>();
392 if (this->index() != -1)
393 status->set_index(this->index());
395 status->set_state(protobuf::IO__CRITICAL_FAILURE);
398 error.set_text(ec.message());
399 this->publish_in(status);
402 <<
"Failed to read from the socket/serial_port: "
403 << error.ShortDebugString() << std::endl;
412 template <
class>
class ThreadType,
bool use_indexed_groups>
414 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
415 use_indexed_groups>::handle_write_error(
const boost::system::error_code& ec)
417 auto status = std::make_shared<protobuf::IOStatus>();
418 if (this->index() != -1)
419 status->set_index(this->index());
421 status->set_state(protobuf::IO__CRITICAL_FAILURE);
424 error.set_text(ec.message());
425 this->publish_in(status);
428 <<
"Failed to write to the socket/serial_port: "
429 << error.ShortDebugString() << std::endl;