23 #include "pb_modem_driver.h" 25 #include "goby/acomms/modemdriver/driver_exception.h" 26 #include "goby/acomms/modemdriver/mm_driver.h" 27 #include "goby/common/logger.h" 28 #include "goby/util/binary.h" 31 using goby::util::hex_decode;
32 using goby::util::hex_encode;
37 : StaticProtobufNode(zeromq_service), zeromq_service_(zeromq_service),
38 last_send_time_(
goby_time<
uint64>()), request_socket_id_(0), query_interval_seconds_(1),
39 reset_interval_seconds_(120), waiting_for_reply_(false), next_frame_(0)
41 on_receipt<acomms::protobuf::StoreServerResponse>(0, &PBDriver::handle_response,
this);
47 request_.set_modem_id(driver_cfg_.modem_id());
49 service_cfg_.add_socket()->CopyFrom(driver_cfg_.GetExtension(PBDriverConfig::request_socket));
50 zeromq_service_->set_cfg(service_cfg_);
52 request_socket_id_ = driver_cfg_.GetExtension(PBDriverConfig::request_socket).socket_id();
54 query_interval_seconds_ = driver_cfg_.GetExtension(PBDriverConfig::query_interval_seconds);
56 reset_interval_seconds_ = driver_cfg_.GetExtension(PBDriverConfig::reset_interval_seconds);
64 switch (orig_msg.type())
66 case acomms::protobuf::ModemTransmission::DATA:
70 signal_modify_transmission(&msg);
72 if (driver_cfg_.modem_id() == msg.src())
74 if (!msg.has_frame_start())
75 msg.set_frame_start(next_frame_);
78 if (msg.rate() < driver_cfg_.ExtensionSize(PBDriverConfig::rate_to_bytes))
79 msg.set_max_frame_bytes(
80 driver_cfg_.GetExtension(PBDriverConfig::rate_to_bytes, msg.rate()));
82 msg.set_max_frame_bytes(
83 driver_cfg_.GetExtension(PBDriverConfig::max_frame_size));
85 if (msg.rate() < driver_cfg_.ExtensionSize(PBDriverConfig::rate_to_frames))
86 msg.set_max_num_frames(
87 driver_cfg_.GetExtension(PBDriverConfig::rate_to_frames, msg.rate()));
90 if (msg.frame_size() < (
int)msg.max_num_frames())
91 ModemDriverBase::signal_data_request(&msg);
93 next_frame_ += msg.frame_size();
96 if (msg.frame_size() && msg.frame(0).size())
98 request_.add_outbox()->CopyFrom(msg);
104 msg.SetExtension(PBDriverTransmission::poll_src, msg.src());
105 msg.SetExtension(PBDriverTransmission::poll_dest, msg.dest());
107 msg.set_dest(msg.src());
108 msg.set_src(driver_cfg_.modem_id());
110 msg.set_type(goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC);
111 msg.SetExtension(PBDriverTransmission::type, PBDriverTransmission::PB_DRIVER_POLL);
113 request_.add_outbox()->CopyFrom(msg);
118 glog.is(DEBUG1) &&
glog << group(glog_out_group()) << warn
119 <<
"Not initiating transmission because we were given an " 120 "invalid transmission type for the base Driver:" 121 << orig_msg.DebugString() << std::endl;
128 while (zeromq_service_->poll(0)) {}
131 if (!waiting_for_reply_ && request_.IsInitialized() &&
132 goby_time<uint64>() >
133 last_send_time_ + 1000000 * static_cast<uint64>(query_interval_seconds_))
135 static int request_id = 0;
136 request_.set_request_id(request_id++);
137 glog.is(DEBUG1) &&
glog << group(glog_out_group()) <<
"Sending to server." << std::endl;
138 glog.is(DEBUG2) &&
glog << group(glog_out_group()) <<
"Outbox: " << request_.DebugString()
140 send(request_, request_socket_id_);
141 last_send_time_ = goby_time<uint64>();
142 request_.clear_outbox();
143 waiting_for_reply_ =
true;
145 else if (waiting_for_reply_ &&
146 goby_time<uint64>() > last_send_time_ + 1e6 * reset_interval_seconds_)
148 glog.is(DEBUG1) &&
glog << group(glog_out_group()) << warn <<
"No response in " 149 << reset_interval_seconds_ <<
" seconds, resetting socket." 151 zeromq_service_->close_all();
152 zeromq_service_->set_cfg(service_cfg_);
153 waiting_for_reply_ =
false;
159 glog.is(DEBUG1) &&
glog << group(glog_in_group()) <<
"Received response in " 160 << (goby_time<uint64_t>() - last_send_time_) / 1.0e6 <<
" seconds." 163 glog.is(DEBUG2) &&
glog << group(glog_in_group()) <<
"Inbox: " << response.DebugString()
166 for (
int i = 0, n = response.inbox_size(); i < n; ++i)
171 if (msg.type() == goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC &&
172 msg.GetExtension(PBDriverTransmission::type) == PBDriverTransmission::PB_DRIVER_POLL &&
173 msg.GetExtension(PBDriverTransmission::poll_src) == driver_cfg_.modem_id())
176 data_msg.set_type(goby::acomms::protobuf::ModemTransmission::DATA);
177 data_msg.set_src(msg.GetExtension(PBDriverTransmission::poll_src));
178 data_msg.set_dest(msg.GetExtension(PBDriverTransmission::poll_dest));
180 data_msg.ClearExtension(PBDriverTransmission::type);
181 data_msg.ClearExtension(PBDriverTransmission::poll_dest);
182 data_msg.ClearExtension(PBDriverTransmission::poll_src);
184 handle_initiate_transmission(data_msg);
189 if (msg.dest() == driver_cfg_.modem_id() &&
190 msg.type() == acomms::protobuf::ModemTransmission::DATA && msg.ack_requested())
193 ack.set_type(goby::acomms::protobuf::ModemTransmission::ACK);
194 ack.set_src(msg.dest());
195 ack.set_dest(msg.src());
196 for (
int i = msg.frame_start(), n = msg.frame_size() + msg.frame_start(); i < n;
198 ack.add_acked_frame(i);
205 waiting_for_reply_ =
false;
void startup(const acomms::protobuf::DriverConfig &cfg)
Starts the modem driver. Must be called before poll().
void do_work()
Allows the modem driver to do its work.
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
void handle_initiate_transmission(const acomms::protobuf::ModemTransmission &m)
Virtual initiate_transmission method. Typically connected to MACManager::signal_initiate_transmission...
common::FlexOstream glog
Access the Goby logger through this object.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
void shutdown()
Shuts down the modem driver.