Note: Goby version 1 (shown here) is now considered obsolete. Please use version 2 for new projects, and consider upgrading old projects.

Goby Underwater Autonomy Project  Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
moos/pAcommsHandler/pAcommsHandler.cpp
00001 // t. schneider tes@mit.edu 06.05.08
00002 // ocean engineering graudate student - mit / whoi joint program
00003 // massachusetts institute of technology (mit)
00004 // laboratory for autonomous marine sensing systems (lamss)
00005 // 
00006 // this is pAcommsHandler.cpp, part of pAcommsHandler 
00007 //
00008 // see the readme file within this directory for information
00009 // pertaining to usage and purpose of this script.
00010 //
00011 // This program is free software: you can redistribute it and/or modify
00012 // it under the terms of the GNU General Public License as published by
00013 // the Free Software Foundation, either version 3 of the License, or
00014 // (at your option) any later version.
00015 //
00016 // This software is distributed in the hope that it will be useful,
00017 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00018 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019 // GNU General Public License for more details.
00020 //
00021 // You should have received a copy of the GNU General Public License
00022 // along with this software.  If not, see <http://www.gnu.org/licenses/>.
00023 
00024 #include <cctype>
00025 
00026 #include <boost/lexical_cast.hpp>
00027 #include <boost/algorithm/string.hpp>
00028 #include <boost/date_time/posix_time/posix_time.hpp>
00029 #include <boost/numeric/conversion/cast.hpp>
00030 #include <boost/foreach.hpp>
00031 #include <boost/math/special_functions/fpclassify.hpp>
00032 
00033 #include "pAcommsHandler.h"
00034 #include "goby/util/sci.h"
00035 
00036 using namespace goby::util::tcolor;
00037 using goby::acomms::operator<<;
00038 using goby::util::glogger;
00039 using goby::util::as;
00040 using google::protobuf::uint32;
00041 using goby::acomms::NaN;
00042 using goby::acomms::DCCLMessageVal;
00043 
00044 pAcommsHandlerConfig CpAcommsHandler::cfg_;
00045 CpAcommsHandler* CpAcommsHandler::inst_ = 0;
00046 
00047 CpAcommsHandler* CpAcommsHandler::get_instance()
00048 {
00049     if(!inst_)
00050         inst_ = new CpAcommsHandler();
00051     return inst_;
00052 }
00053 
00054 CpAcommsHandler::CpAcommsHandler()
00055     : TesMoosApp(&cfg_),
00056       dccl_(&glogger()),
00057       queue_manager_(&glogger()),
00058       driver_(0),
00059       mac_(&glogger())
00060 {
00061     goby::acomms::connect(&queue_manager_.signal_receive,
00062                           this, &CpAcommsHandler::queue_incoming_data);
00063     goby::acomms::connect(&queue_manager_.signal_receive_ccl,
00064                           this, &CpAcommsHandler::queue_incoming_data);
00065     goby::acomms::connect(&queue_manager_.signal_ack,
00066                           this, &CpAcommsHandler::queue_ack);
00067     goby::acomms::connect(&queue_manager_.signal_data_on_demand,
00068                           this, &CpAcommsHandler::queue_on_demand);
00069     goby::acomms::connect(&queue_manager_.signal_queue_size_change,
00070                           this, &CpAcommsHandler::queue_qsize);
00071     goby::acomms::connect(&queue_manager_.signal_expire,
00072                           this, &CpAcommsHandler::queue_expire);
00073 
00074     process_configuration();
00075 
00076     // bind the lower level pieces of goby-acomms together
00077     if(driver_)
00078     {
00079         goby::acomms::bind(*driver_, queue_manager_);
00080         goby::acomms::bind(mac_, *driver_);
00081 
00082 // bind our methods to the rest of the goby-acomms signals
00083         goby::acomms::connect(&driver_->signal_all_outgoing,
00084                               this, &CpAcommsHandler::modem_raw_out);
00085         goby::acomms::connect(&driver_->signal_all_incoming,
00086                               this, &CpAcommsHandler::modem_raw_in);    
00087         goby::acomms::connect(&driver_->signal_range_reply,
00088                               this, &CpAcommsHandler::modem_range_reply);
00089     }
00090     
00091     
00092     do_subscriptions();
00093 }
00094 
00095 CpAcommsHandler::~CpAcommsHandler()
00096 {}
00097 
00098 void CpAcommsHandler::loop()
00099 {
00100     dccl_loop();
00101     if(driver_) driver_->do_work();
00102     mac_.do_work();
00103     queue_manager_.do_work();    
00104 }
00105 
00106 
00107 void CpAcommsHandler::dccl_loop()
00108 {
00109     std::set<unsigned> ids;
00110     if(dccl_.is_time_trigger(ids))
00111     {
00112         BOOST_FOREACH(unsigned id, ids)
00113         {
00114             goby::acomms::protobuf::ModemDataTransmission mm;
00115             pack(id, &mm);
00116         }
00117     }
00118     if(cfg_.tcp_share_enable() && tcp_share_server_)
00119     {
00120         std::string s;
00121         while(tcp_share_server_->readline(&s))
00122         {
00123             goby::acomms::protobuf::ModemDataTransmission msg;
00124             parse_for_moos(s, &msg);
00125             
00126             glogger() << group("tcp") << "incoming: " << msg << std::endl;
00127             
00128             // post for others to use as needed in MOOS
00129 
00130             goby::acomms::DCCLHeaderDecoder decoder(msg.data());
00131             unsigned dccl_id = decoder[goby::acomms::HEAD_DCCL_ID]; 
00132             std::string in_var = dccl_.get_incoming_hex_var(dccl_id);
00133             
00134             publish(in_var,s);
00135             unpack(msg);
00136         }
00137     }
00138     
00139 }
00140 
00141 
00142 void CpAcommsHandler::do_subscriptions()
00143 // RegisterVariables: register for variables we want to get mail for
00144 {
00145     // non dccl
00146     typedef std::pair<std::string, goby::acomms::protobuf::QueueKey> P;
00147     BOOST_FOREACH(const P& p, out_moos_var2queue_)
00148     {
00149         if(p.second.type() != goby::acomms::protobuf::QUEUE_DCCL)
00150             subscribe(p.first, &CpAcommsHandler::handle_message_push, this);
00151     }
00152         
00153 
00154 
00155     // ping
00156     subscribe(MOOS_VAR_COMMAND_RANGING, &CpAcommsHandler::handle_ranging_request, this);
00157 
00158     // update comms cycle
00159     subscribe(MOOS_VAR_CYCLE_UPDATE, &CpAcommsHandler::handle_mac_cycle_update, this);
00160     subscribe(MOOS_VAR_POLLER_UPDATE, &CpAcommsHandler::handle_mac_cycle_update, this);
00161     
00162     subscribe(MOOS_VAR_FLUSH_QUEUE, &CpAcommsHandler::handle_flush_queue, this);
00163     
00164     
00165     std::set<std::string> enc_vars, dec_vars;
00166 
00167     BOOST_FOREACH(unsigned id, dccl_.all_message_ids())
00168     {
00169         std::set<std::string> vars = dccl_.get_pubsub_encode_vars(id);
00170         enc_vars.insert(vars.begin(), vars.end());
00171         
00172         vars = dccl_.get_pubsub_decode_vars(id);
00173         dec_vars.insert(vars.begin(), vars.end());       
00174     }
00175     BOOST_FOREACH(const std::string& s, dec_vars)
00176     {
00177         if(!s.empty())
00178         {
00179             subscribe(s, &CpAcommsHandler::dccl_inbox, this);
00180             
00181             glogger() << group("dccl_dec")
00182                   <<  "registering (dynamic) for decoding related hex var: {"
00183                   << s << "}" << std::endl;
00184         }
00185     }
00186     BOOST_FOREACH(const std::string& s, enc_vars)
00187     {
00188         if(!s.empty())
00189         {
00190             subscribe(s, &CpAcommsHandler::dccl_inbox, this);
00191             
00192             glogger() << group("dccl_dec") <<  "registering (dynamic) for encoding related hex var: {" << s << "}" << std::endl;
00193         }
00194     }
00195 
00196     glogger() << group("dccl_enc") << dccl_.brief_summary() << std::endl;
00197 }
00198 
00199 
00200 //
00201 // Mail Handlers
00202 //
00203 
00204 
00205 void CpAcommsHandler::dccl_inbox(const CMOOSMsg& msg)
00206 {            
00207     const std::string& key = msg.GetKey();  
00208     bool is_str = msg.IsString();
00209     const std::string& sval = msg.GetString();
00210     
00211     std::set<unsigned> ids;    
00212     unsigned id;
00213     if(dccl_.is_publish_trigger(ids, key, sval))
00214     {
00215         BOOST_FOREACH(unsigned id, ids)
00216         {
00217             goby::acomms::protobuf::ModemDataTransmission mm;
00218             pack(id, &mm);
00219         }
00220     }
00221     else if(dccl_.is_incoming(id, key) && is_str &&
00222             !(msg.GetSource() == GetAppName() && msg.GetCommunity() == goby::util::as<std::string>(cfg_.common().community())))
00223     {
00224         goby::acomms::protobuf::ModemDataTransmission mm;
00225         parse_for_moos(sval, &mm);
00226         unpack(mm);
00227     }
00228 }
00229 
00230 void CpAcommsHandler::handle_ranging_request(const CMOOSMsg& msg)
00231 {
00232     goby::acomms::protobuf::ModemRangingRequest request_msg;
00233     parse_for_moos(msg.GetString(), &request_msg);
00234     glogger() << "ranging request: " << request_msg << std::endl;
00235     if(driver_) driver_->handle_initiate_ranging(&request_msg);
00236 }
00237 
00238 void CpAcommsHandler::handle_flush_queue(const CMOOSMsg& msg)
00239 {
00240     goby::acomms::protobuf::QueueFlush flush;
00241     std::string moos_string = boost::trim_copy(msg.GetString());
00242     // if contains no spaces, assume it is a "key=value," string
00243     if(moos_string.find(" ") == std::string::npos)
00244         from_moos_comma_equals_string(&flush, moos_string);
00245     // assume it is a TextFormat protobuf message
00246     else
00247         parse_for_moos(moos_string, &flush);
00248     
00249     glogger() << "queue flush request: " << flush << std::endl;
00250     queue_manager_.flush_queue(flush);
00251 }
00252 
00253 
00254 void CpAcommsHandler::handle_message_push(const CMOOSMsg& msg)
00255 {
00256     goby::acomms::protobuf::ModemDataTransmission new_message;
00257     parse_for_moos(msg.GetString(), &new_message);
00258 
00259     if(!new_message.base().has_time())
00260         new_message.mutable_base()->set_time(as<std::string>(goby::util::goby_time()));
00261         
00262     goby::acomms::protobuf::QueueKey& qk = out_moos_var2queue_[msg.GetKey()];
00263     
00264     if(new_message.data().empty())
00265         glogger() << warn << "message is either empty or contains invalid data" << std::endl;
00266     else if(!(qk.type() == goby::acomms::protobuf::QUEUE_DCCL))
00267     {
00268         new_message.mutable_queue_key()->CopyFrom(out_moos_var2queue_[msg.GetKey()]);
00269         
00270         std::string serialized;
00271         serialize_for_moos(&serialized, new_message);
00272         publish(MOOS_VAR_OUTGOING_DATA, serialized);
00273         queue_manager_.push_message(new_message);
00274     }
00275 }
00276 
00277 
00278 void CpAcommsHandler::handle_mac_cycle_update(const CMOOSMsg& msg)
00279 {
00280     goby::acomms::protobuf::MACUpdate update_msg;
00281     parse_for_moos(msg.GetString(), &update_msg);
00282     
00283     glogger() << "got update for MAC: " << update_msg << std::endl;
00284 
00285     if(!update_msg.dest() == cfg_.modem_id())
00286     {
00287         glogger() << "update not for us" << std::endl;
00288         return;
00289     }
00290     
00291     switch(update_msg.update_type())
00292     {
00293         case goby::acomms::protobuf::MACUpdate::REPLACE:
00294             mac_.clear_all_slots();
00295             // fall through intentional
00296         case goby::acomms::protobuf::MACUpdate::ADD:
00297             BOOST_FOREACH(const goby::acomms::protobuf::Slot& slot, update_msg.slot())
00298                 mac_.add_slot(slot);
00299             break;
00300 
00301         case goby::acomms::protobuf::MACUpdate::REMOVE:
00302             BOOST_FOREACH(const goby::acomms::protobuf::Slot& slot, update_msg.slot())
00303                 mac_.remove_slot(slot);
00304             break;
00305     }
00306 }
00307 
00308 //
00309 // Callbacks from goby libraries
00310 //
00311 void CpAcommsHandler::queue_qsize(const goby::acomms::protobuf::QueueSize& size)
00312 {
00313     glogger() << "New qsize: " << size << std::endl;
00314     
00315     std::string serialized;
00316     serialize_for_moos(&serialized, size);
00317     
00318     publish(MOOS_VAR_QSIZE, serialized);
00319 }
00320 void CpAcommsHandler::queue_expire(const goby::acomms::protobuf::ModemDataExpire& message)
00321 {
00322     std::string serialized;
00323     serialize_for_moos(&serialized, message);
00324     
00325     publish(MOOS_VAR_EXPIRE, serialized);
00326 }
00327 
00328 void CpAcommsHandler::queue_ack(const goby::acomms::protobuf::ModemDataAck& message)
00329 {
00330     std::string serialized;
00331     serialize_for_moos(&serialized, message);
00332     
00333     publish(MOOS_VAR_ACK, serialized);
00334 }
00335 
00336 
00337 
00338 void CpAcommsHandler::queue_incoming_data(const goby::acomms::protobuf::ModemDataTransmission& message)
00339 {
00340     std::string serialized;
00341     serialize_for_moos(&serialized, message);        
00342     CMOOSMsg m(MOOS_NOTIFY, MOOS_VAR_INCOMING_DATA, serialized, -1);
00343     publish(m);
00344 
00345     // we know what this type is
00346     if(in_queue2moos_var_.count(message.queue_key()))
00347     {
00348         CMOOSMsg m_specific(MOOS_NOTIFY, in_queue2moos_var_[message.queue_key()], serialized, -1);
00349         
00350         publish(m_specific);
00351     
00352         glogger() << group("q_in") << "published received data to "
00353                   << in_queue2moos_var_[message.queue_key()] << ": " << message << std::endl;
00354         
00355         if(message.queue_key().type() == goby::acomms::protobuf::QUEUE_DCCL)
00356             unpack(message);
00357     }
00358 }
00359 
00360 
00361 void CpAcommsHandler::queue_on_demand(const goby::acomms::protobuf::ModemDataRequest& request_msg, goby::acomms::protobuf::ModemDataTransmission* data_msg)
00362 {
00363     pack(data_msg->queue_key().id(), data_msg);
00364 }
00365 
00366 
00367 
00368 void CpAcommsHandler::modem_raw_in(const goby::acomms::protobuf::ModemMsgBase& base_msg)
00369 {
00370     if(base_msg.raw().length() < MAX_MOOS_PACKET)
00371         publish(MOOS_VAR_NMEA_IN, base_msg.raw());
00372 }
00373 
00374 void CpAcommsHandler::modem_raw_out(const goby::acomms::protobuf::ModemMsgBase& base_msg)
00375 {
00376     std::string out;
00377     serialize_for_moos(&out, base_msg);
00378         
00379     if(base_msg.raw().length() < MAX_MOOS_PACKET)
00380         publish(MOOS_VAR_NMEA_OUT, base_msg.raw());
00381     
00382 }
00383 
00384 void CpAcommsHandler::modem_range_reply(const goby::acomms::protobuf::ModemRangingReply& message)
00385 {
00386     glogger() << "got range reply: " << message << std::endl;    
00387     std::string serialized;
00388     serialize_for_moos(&serialized, message);
00389     publish(MOOS_VAR_RANGING, serialized);
00390 }
00391 
00392 
00393 //
00394 // READ CONFIGURATION
00395 //
00396 
00397 void CpAcommsHandler::process_configuration()
00398 {
00399     // create driver object
00400     switch(cfg_.driver_type())
00401     {
00402         case pAcommsHandlerConfig::DRIVER_WHOI_MICROMODEM:
00403             driver_ = new goby::acomms::MMDriver(&glogger());
00404             break;
00405 
00406         case pAcommsHandlerConfig::DRIVER_ABC_EXAMPLE_MODEM:
00407             driver_ = new goby::acomms::ABCDriver(&glogger());
00408             break;
00409             
00410         case pAcommsHandlerConfig::DRIVER_NONE: break;
00411     }
00412 
00413     // add groups to flexostream human terminal output
00414     mac_.add_flex_groups(&glogger());
00415     dccl_.add_flex_groups(&glogger());
00416     queue_manager_.add_flex_groups(&glogger());
00417     if(driver_) driver_->add_flex_groups(&glogger());
00418     glogger().add_group("tcp", goby::util::Colors::green, "tcp share");
00419     
00420     if(cfg_.has_modem_id_lookup_path())
00421         glogger() << modem_lookup_.read_lookup_file(cfg_.modem_id_lookup_path()) << std::endl;
00422     else
00423         glogger() << warn << "no modem_id_lookup_path in moos file. this is required for conversions between modem_id and vehicle name / type." << std::endl;
00424     
00425     glogger() << "reading in geodesy information: " << std::endl;
00426     if (!cfg_.common().has_lat_origin() || !cfg_.common().has_lon_origin())
00427     {
00428         glogger() << die << "no lat_origin or lon_origin specified in configuration. this is required for geodesic conversion" << std::endl;
00429     }
00430     else
00431     {
00432         if(geodesy_.Initialise(cfg_.common().lat_origin(), cfg_.common().lon_origin()))
00433             glogger() << "success!" << std::endl;
00434         else
00435             glogger() << die << "could not initialize geodesy" << std::endl;
00436     }
00437 
00438     // check and propogate modem id
00439     if(cfg_.modem_id() == goby::acomms::BROADCAST_ID)
00440         glogger() << die << "modem_id = " << goby::acomms::BROADCAST_ID << " is reserved for broadcast messages. You must specify a modem_id != " << goby::acomms::BROADCAST_ID << " for this vehicle." << std::endl;
00441     
00442     publish("MODEM_ID", cfg_.modem_id());    
00443     publish("VEHICLE_ID", cfg_.modem_id());    
00444     
00445     cfg_.mutable_queue_cfg()->set_modem_id(cfg_.modem_id());
00446     cfg_.mutable_mac_cfg()->set_modem_id(cfg_.modem_id());
00447     cfg_.mutable_dccl_cfg()->set_modem_id(cfg_.modem_id());
00448     cfg_.mutable_driver_cfg()->set_modem_id(cfg_.modem_id());
00449     
00450     // do a unique merge of the message files for dccl and queue
00451     for(int i = 0, n = cfg_.dccl_cfg().message_file_size(); i < n; ++i)
00452     {
00453         bool exists = false;
00454         for(int j = 0, o = cfg_.queue_cfg().message_file_size(); j < o; ++j)        
00455         {
00456             if(cfg_.queue_cfg().message_file(j).path() == cfg_.dccl_cfg().message_file(i).path())
00457                 exists = true;
00458         }
00459         
00460         if(!exists)
00461             cfg_.mutable_queue_cfg()->add_message_file()->CopyFrom(cfg_.dccl_cfg().message_file(i));
00462         
00463     }
00464 
00465     for(int i = 0, n = cfg_.queue_cfg().message_file_size(); i < n; ++i)
00466     {
00467         bool exists = false;
00468         for(int j = 0, o = cfg_.dccl_cfg().message_file_size(); j < o; ++j)        
00469         {
00470             if(cfg_.dccl_cfg().message_file(j).path() == cfg_.queue_cfg().message_file(i).path())
00471                 exists = true;
00472         }
00473         
00474         if(!exists)
00475             cfg_.mutable_dccl_cfg()->add_message_file()->CopyFrom(cfg_.queue_cfg().message_file(i));
00476         
00477     }    
00478 
00479     // start goby-acomms classes
00480     if(driver_) driver_->startup(cfg_.driver_cfg());
00481     mac_.startup(cfg_.mac_cfg());
00482     queue_manager_.set_cfg(cfg_.queue_cfg());
00483     dccl_.set_cfg(cfg_.dccl_cfg());    
00484 
00485     // initialize maps between incoming / outgoing MOOS variables and DCCL ids
00486     std::set<unsigned> ids = dccl_.all_message_ids();
00487     BOOST_FOREACH(unsigned id, ids)
00488     {
00489         goby::acomms::protobuf::QueueKey key;
00490         key.set_type(goby::acomms::protobuf::QUEUE_DCCL);
00491         key.set_id(id);
00492         out_moos_var2queue_[dccl_.get_outgoing_hex_var(id)] = key;
00493         in_queue2moos_var_[key] = dccl_.get_incoming_hex_var(id);
00494     }
00495     
00496     for(int i = 0, n = cfg_.queue_cfg().queue_size(); i < n; ++i)
00497     {
00498         in_queue2moos_var_[cfg_.queue_cfg().queue(i).key()] = cfg_.queue_cfg().queue(i).in_pubsub_var();
00499         out_moos_var2queue_[cfg_.queue_cfg().queue(i).out_pubsub_var()] =
00500             cfg_.queue_cfg().queue(i).key();
00501 
00502     }
00503     
00504 
00505     // tcp share
00506     if(cfg_.tcp_share_enable())
00507     {
00508         glogger() << group("tcp") << "tcp_share_port is: " << cfg_.tcp_share_port() << std::endl;
00509         
00510         tcp_share_server_ = new goby::util::TCPServer(cfg_.tcp_share_port());
00511         tcp_share_server_->start(); 
00512         glogger() << group("tcp") << "starting TCP server on: " << cfg_.tcp_share_port() << std::endl;        
00513     }
00514 
00515     BOOST_FOREACH(const std::string& full_ip, cfg_.tcp_share_to_ip())
00516     {
00517         if(!cfg_.tcp_share_enable())
00518         {            
00519             glogger() << group("tcp") << "tcp_share_in_address given but tcp_enable is false" << std::endl;
00520         }
00521         else
00522         {
00523             std::string ip;
00524             unsigned port = cfg_.tcp_share_port();
00525 
00526             std::string::size_type colon_pos = full_ip.find(":");
00527             if(colon_pos == std::string::npos)
00528                 ip = full_ip;
00529             else
00530             {
00531                 ip = full_ip.substr(0, colon_pos);
00532                 port = boost::lexical_cast<unsigned>(full_ip.substr(colon_pos+1));
00533             }
00534             IP ip_and_port(ip, port);
00535             tcp_share_map_[ip_and_port] = new goby::util::TCPClient(ip, port);
00536             tcp_share_map_[ip_and_port]->start();
00537             glogger() << group("tcp") << "starting TCP client to "<<  ip << ":" << port << std::endl;
00538         }
00539     }
00540     
00541     // set up algorithms
00542     dccl_.add_algorithm("power_to_dB", boost::bind(&CpAcommsHandler::alg_power_to_dB, this, _1));
00543     dccl_.add_algorithm("dB_to_power", boost::bind(&CpAcommsHandler::alg_dB_to_power, this, _1));
00544 
00545     dccl_.add_adv_algorithm("TSD_to_soundspeed", boost::bind(&CpAcommsHandler::alg_TSD_to_soundspeed, this, _1, _2));
00546     dccl_.add_adv_algorithm("subtract", boost::bind(&CpAcommsHandler::alg_subtract, this, _1, _2));
00547     dccl_.add_adv_algorithm("add", boost::bind(&CpAcommsHandler::alg_add, this, _1, _2));
00548     dccl_.add_algorithm("abs", boost::bind(&CpAcommsHandler::alg_abs, this, _1));    
00549     
00550     dccl_.add_algorithm("to_lower", boost::bind(&CpAcommsHandler::alg_to_lower, this, _1));
00551     dccl_.add_algorithm("to_upper", boost::bind(&CpAcommsHandler::alg_to_upper, this, _1));
00552     dccl_.add_algorithm("angle_0_360", boost::bind(&CpAcommsHandler::alg_angle_0_360, this, _1));
00553     dccl_.add_algorithm("angle_-180_180", boost::bind(&CpAcommsHandler::alg_angle_n180_180, this, _1));
00554     
00555     dccl_.add_adv_algorithm("lat2utm_y", boost::bind(&CpAcommsHandler::alg_lat2utm_y, this, _1, _2));
00556     dccl_.add_adv_algorithm("lon2utm_x", boost::bind(&CpAcommsHandler::alg_lon2utm_x, this, _1, _2));
00557     dccl_.add_adv_algorithm("utm_x2lon", boost::bind(&CpAcommsHandler::alg_utm_x2lon, this, _1, _2));
00558     dccl_.add_adv_algorithm("utm_y2lat", boost::bind(&CpAcommsHandler::alg_utm_y2lat, this, _1, _2));
00559     dccl_.add_algorithm("modem_id2name", boost::bind(&CpAcommsHandler::alg_modem_id2name, this, _1));
00560     dccl_.add_algorithm("modem_id2type", boost::bind(&CpAcommsHandler::alg_modem_id2type, this, _1));
00561     dccl_.add_algorithm("name2modem_id", boost::bind(&CpAcommsHandler::alg_name2modem_id, this, _1));
00562 
00563     dccl_.add_algorithm("lat2hemisphere_initial", boost::bind(&CpAcommsHandler::alg_lat2hemisphere_initial, this, _1));
00564     dccl_.add_algorithm("lon2hemisphere_initial", boost::bind(&CpAcommsHandler::alg_lon2hemisphere_initial, this, _1));
00565 
00566     dccl_.add_algorithm("lat2nmea_lat", boost::bind(&CpAcommsHandler::alg_lat2nmea_lat, this, _1));
00567     dccl_.add_algorithm("lon2nmea_lon", boost::bind(&CpAcommsHandler::alg_lon2nmea_lon, this, _1));
00568 
00569     dccl_.add_algorithm("unix_time2nmea_time", boost::bind(&CpAcommsHandler::alg_unix_time2nmea_time, this, _1));
00570     
00571 }
00572 
00573 
00574 void CpAcommsHandler::pack(unsigned dccl_id, goby::acomms::protobuf::ModemDataTransmission* modem_message)
00575 {
00576     // don't bother packing if we can't encode this
00577     if(dccl_.manip_manager().has(dccl_id, goby::acomms::protobuf::MessageFile::NO_ENCODE))
00578         return;
00579     
00580     // encode the message and notify the MOOSDB
00581     std::map<std::string, std::vector<DCCLMessageVal> >& in = repeat_buffer_[dccl_id];
00582 
00583     // first entry
00584     if(!repeat_count_.count(dccl_id))
00585         repeat_count_[dccl_id] = 0;
00586 
00587     ++repeat_count_[dccl_id];        
00588     
00589     BOOST_FOREACH(const std::string& moos_var, dccl_.get_pubsub_src_vars(dccl_id))
00590     {
00591         const CMOOSMsg& moos_msg = dynamic_vars()[moos_var];
00592         
00593         bool is_dbl = valid(moos_msg) ? moos_msg.IsDouble() : false;
00594         bool is_str = valid(moos_msg) ? moos_msg.IsString() : false;
00595         double dval = valid(moos_msg) ? moos_msg.GetDouble() : NaN;
00596         std::string sval = valid(moos_msg) ? moos_msg.GetString() : "";
00597         
00598         if(is_str)
00599             in[moos_var].push_back(sval);
00600         else if(is_dbl)
00601             in[moos_var].push_back(dval);
00602         else
00603             in[moos_var].push_back(DCCLMessageVal());
00604     }
00605 
00606     // send this message out the door
00607     if(repeat_count_[dccl_id] == dccl_.get_repeat(dccl_id))
00608     {
00609         try
00610         {
00611             // encode
00612             dccl_.pubsub_encode(dccl_id, modem_message, in);
00613 
00614             std::string out_var = dccl_.get_outgoing_hex_var(dccl_id);
00615             
00616             std::string serialized;
00617             serialize_for_moos(&serialized, *modem_message);        
00618             publish(MOOS_VAR_OUTGOING_DATA, serialized);
00619             publish(out_var, serialized);
00620             
00621             modem_message->mutable_queue_key()->set_type(goby::acomms::protobuf::QUEUE_DCCL);
00622             modem_message->mutable_queue_key()->set_id(dccl_id);            
00623             queue_manager_.push_message(*modem_message);
00624             
00625             handle_tcp_share(modem_message);
00626         }
00627         catch(std::exception& e)
00628         {
00629             glogger() << group("dccl_enc") << warn << "could not encode message: " << *modem_message << ", reason: " << e.what() << std::endl;
00630         }
00631 
00632         // flush buffer
00633         repeat_buffer_[dccl_id].clear();
00634         // reset counter
00635         repeat_count_[dccl_id] = 0;
00636     }
00637     else
00638     {
00639         glogger() << group("dccl_enc") << "finished buffering part " << repeat_count_[dccl_id] << " of " <<  dccl_.get_repeat(dccl_id) << " for repeated message: " << dccl_.id2name(dccl_id) << ". Nothing has been sent yet." << std::endl;
00640     }
00641     
00642 }
00643 
00644 void CpAcommsHandler::unpack(goby::acomms::protobuf::ModemDataTransmission modem_message)
00645 {
00646     handle_tcp_share(&modem_message);
00647 
00648     try
00649     {
00650         if(modem_message.base().dest() != cfg_.modem_id() && modem_message.base().dest() != goby::acomms::BROADCAST_ID && !dccl_.manip_manager().has(modem_message.queue_key().id(), goby::acomms::protobuf::MessageFile::PROMISCUOUS))
00651         {
00652             glogger() << group("dccl_dec") << "ignoring message for modem_id " << modem_message.base().dest() << std::endl;
00653             return;
00654         }
00655         
00656         std::multimap<std::string, DCCLMessageVal> out;
00657         
00658         dccl_.pubsub_decode(modem_message, &out);
00659         
00660         typedef std::pair<std::string, DCCLMessageVal> P;
00661         BOOST_FOREACH(const P& p, out)
00662         {
00663             if(p.second.type() == goby::acomms::cpp_double)
00664             {
00665                 double dval = p.second;
00666                 CMOOSMsg m(MOOS_NOTIFY, p.first, dval, -1);
00667                 publish(m);
00668             }
00669             else
00670             {
00671                 std::string sval = p.second;
00672                 CMOOSMsg m(MOOS_NOTIFY, p.first, sval, -1);
00673                 publish(m);   
00674             }
00675         }
00676     }
00677     catch(std::exception& e)
00678     {
00679         glogger() << group("dccl_dec") << warn << "could not decode message: " << modem_message << ", reason: " << e.what() << std::endl;
00680     }    
00681 }
00682 
00683 
00684 
00685 void CpAcommsHandler::handle_tcp_share(goby::acomms::protobuf::ModemDataTransmission* modem_message)
00686 {
00687     goby::acomms::DCCLHeaderDecoder decoder(modem_message->data());
00688     unsigned dccl_id = decoder[goby::acomms::HEAD_DCCL_ID];
00689     
00690     if(cfg_.tcp_share_enable() && dccl_.manip_manager().has(dccl_id, goby::acomms::protobuf::MessageFile::TCP_SHARE_IN))
00691     {
00692         typedef std::pair<IP, goby::util::TCPClient*> P;
00693         BOOST_FOREACH(const P&p, tcp_share_map_)
00694         {
00695             if(p.second->active())
00696             {
00697                 std::stringstream ss;
00698                 ss << p.second->local_endpoint() << ":" << cfg_.tcp_share_port();
00699                 modem_message->AddExtension(pAcommsHandlerExtensions::seen_ip, ss.str());                
00700                 
00701                 std::string serialized;
00702                 serialize_for_moos(&serialized, *modem_message);
00703 
00704                 bool ip_seen = false;
00705                 for(int i = 0, n = modem_message->ExtensionSize(pAcommsHandlerExtensions::seen_ip); i < n; ++i)
00706                 {
00707                     if(modem_message->GetExtension(pAcommsHandlerExtensions::seen_ip, i) == p.first.ip_and_port())
00708                         ip_seen = true;
00709                 }
00710                 
00711                 
00712                 if(!ip_seen)
00713                 {
00714                     glogger() << group("tcp") << "dccl_id: " << dccl_id << ": " << *modem_message << " to " << p.first.ip << ":" << p.first.port << std::endl;
00715                     p.second->write(serialized + "\r\n");
00716                 }
00717                 else
00718                 {
00719                     glogger() << group("tcp") << p.first.ip << ":" << p.first.port << " has already seen this message, so not sending." << std::endl;
00720                 }
00721             }
00722             else
00723                 glogger() << group("tcp") << warn << p.first.ip << ":" << p.first.port << " is not connected." << std::endl;
00724         }
00725     }    
00726 }
00727 
00728 
00729 //
00730 // DCCL ALGORITHMS
00731 //
00732 
00733 
00734 void CpAcommsHandler::alg_power_to_dB(DCCLMessageVal& val_to_mod)
00735 {
00736     val_to_mod = 10*log10(double(val_to_mod));
00737 }
00738 
00739 void CpAcommsHandler::alg_dB_to_power(DCCLMessageVal& val_to_mod)
00740 {
00741     val_to_mod = pow(10.0, double(val_to_mod)/10.0);
00742 }
00743 
00744 // applied to "T" (temperature), references are "S" (salinity), then "D" (depth)
00745 void CpAcommsHandler::alg_TSD_to_soundspeed(DCCLMessageVal& val,
00746                            const std::vector<DCCLMessageVal>& ref_vals)
00747 {
00748     val.set(goby::util::mackenzie_soundspeed(val, ref_vals[0], ref_vals[1]), 3);
00749 }
00750 
00751 // operator-=
00752 void CpAcommsHandler::alg_subtract(DCCLMessageVal& val,
00753                                  const std::vector<DCCLMessageVal>& ref_vals)
00754 {
00755     double d = val;
00756     BOOST_FOREACH(const::DCCLMessageVal& mv, ref_vals)
00757         d -= double(mv);
00758     
00759     val.set(d, val.precision());
00760 }
00761 
00762 // operator+=
00763 void CpAcommsHandler::alg_add(DCCLMessageVal& val,
00764                             const std::vector<DCCLMessageVal>& ref_vals)
00765 {
00766     double d = val;
00767     BOOST_FOREACH(const::DCCLMessageVal& mv, ref_vals)
00768         d += double(mv);
00769     val.set(d, val.precision());
00770 }
00771 
00772 
00773 
00774 void CpAcommsHandler::alg_to_upper(DCCLMessageVal& val_to_mod)
00775 {
00776     val_to_mod = boost::algorithm::to_upper_copy(std::string(val_to_mod));
00777 }
00778 
00779 void CpAcommsHandler::alg_to_lower(DCCLMessageVal& val_to_mod)
00780 {
00781     val_to_mod = boost::algorithm::to_lower_copy(std::string(val_to_mod));
00782 }
00783 
00784 void CpAcommsHandler::alg_angle_0_360(DCCLMessageVal& angle)
00785 {
00786     double a = angle;
00787     while (a < 0)
00788         a += 360;
00789     while (a >=360)
00790         a -= 360;
00791     angle = a;
00792 }
00793 
00794 void CpAcommsHandler::alg_angle_n180_180(DCCLMessageVal& angle)
00795 {
00796     double a = angle;
00797     while (a < -180)
00798         a += 360;
00799     while (a >=180)
00800         a -= 360;
00801     angle = a;
00802 }
00803 
00804 void CpAcommsHandler::alg_lat2utm_y(DCCLMessageVal& mv,
00805                                     const std::vector<DCCLMessageVal>& ref_vals)
00806 {
00807     double lat = mv;
00808     double lon = ref_vals[0];
00809     double x = NaN;
00810     double y = NaN;
00811         
00812     if(!(boost::math::isnan)(lat) && !(boost::math::isnan)(lon)) geodesy_.LatLong2LocalUTM(lat, lon, y, x);        
00813     mv = y;
00814 }
00815 
00816 void CpAcommsHandler::alg_lon2utm_x(DCCLMessageVal& mv,
00817                                     const std::vector<DCCLMessageVal>& ref_vals)
00818 {
00819     double lon = mv;
00820     double lat = ref_vals[0];
00821     double x = NaN;
00822     double y = NaN;
00823 
00824     if(!(boost::math::isnan)(lat) && !(boost::math::isnan)(lon)) geodesy_.LatLong2LocalUTM(lat, lon, y, x);
00825     mv = x;
00826 }
00827 
00828 
00829 void CpAcommsHandler::alg_utm_x2lon(DCCLMessageVal& mv,
00830                                     const std::vector<DCCLMessageVal>& ref_vals)
00831 {
00832     double x = mv;
00833     double y = ref_vals[0];
00834 
00835     double lat = NaN;
00836     double lon = NaN;
00837     if(!(boost::math::isnan)(y) && !(boost::math::isnan)(x)) geodesy_.UTM2LatLong(x, y, lat, lon);    
00838     mv = lon;
00839 }
00840 
00841 void CpAcommsHandler::alg_utm_y2lat(DCCLMessageVal& mv,
00842                                     const std::vector<DCCLMessageVal>& ref_vals)
00843 {
00844     double y = mv;
00845     double x = ref_vals[0];
00846     
00847     double lat = NaN;
00848     double lon = NaN;
00849     if(!(boost::math::isnan)(x) && !(boost::math::isnan)(y)) geodesy_.UTM2LatLong(x, y, lat, lon);    
00850     mv = lat;
00851 }
00852 
00853 
00854  void CpAcommsHandler::alg_modem_id2name(DCCLMessageVal& in)
00855  {
00856     bool is_numeric = true;
00857     BOOST_FOREACH(const char c, std::string(in))
00858     {
00859         if(!isdigit(c))
00860         {
00861             is_numeric = false;
00862             break;
00863         }
00864     }
00865     if(is_numeric) in = modem_lookup_.get_name_from_id(boost::lexical_cast<unsigned>(std::string(in)));
00866  }
00867  
00868  void CpAcommsHandler::alg_modem_id2type(DCCLMessageVal& in)
00869  {
00870     bool is_numeric = true;
00871     BOOST_FOREACH(const char c, std::string(in))
00872     {
00873         if(!isdigit(c))
00874         {
00875             is_numeric = false;
00876             break;
00877         }    
00878     }
00879     if(is_numeric) in = modem_lookup_.get_type_from_id(boost::lexical_cast<unsigned>(std::string(in)));
00880 
00881  }
00882 
00883 void CpAcommsHandler::alg_name2modem_id(DCCLMessageVal& in)
00884 {
00885     std::stringstream ss;
00886     ss << modem_lookup_.get_id_from_name(std::string(in));
00887     in = ss.str();
00888 }
00889 
00890 void CpAcommsHandler::alg_lat2hemisphere_initial(goby::acomms::DCCLMessageVal& val_to_mod)
00891 {
00892     double lat = val_to_mod;
00893     if(lat < 0)
00894         val_to_mod = "S";
00895     else
00896         val_to_mod = "N";        
00897 }
00898 
00899 void CpAcommsHandler::alg_lon2hemisphere_initial(goby::acomms::DCCLMessageVal& val_to_mod)
00900 {
00901     double lon = val_to_mod;
00902     if(lon < 0)
00903         val_to_mod = "W";
00904     else
00905         val_to_mod = "E";
00906 }
00907 
00908 void CpAcommsHandler::alg_abs(goby::acomms::DCCLMessageVal& val_to_mod)
00909 {
00910     val_to_mod = std::abs(double(val_to_mod));
00911 }
00912 
00913 void CpAcommsHandler::alg_unix_time2nmea_time(goby::acomms::DCCLMessageVal& val_to_mod)
00914 {
00915     double unix_time = val_to_mod;
00916     boost::posix_time::ptime ptime = goby::util::unix_double2ptime(unix_time);
00917 
00918     // HHMMSS.SSSSSS
00919     boost::format f("%02d%02d%02d.%06d");
00920     f % ptime.time_of_day().hours() % ptime.time_of_day().minutes() % ptime.time_of_day().seconds() % (ptime.time_of_day().fractional_seconds() * 1000000 / boost::posix_time::time_duration::ticks_per_second());
00921     
00922     val_to_mod = f.str();
00923 }
00924 
00925 
00926 void CpAcommsHandler::alg_lat2nmea_lat(goby::acomms::DCCLMessageVal& val_to_mod)
00927 {
00928     double lat = val_to_mod;
00929 
00930     // DDMM.MM
00931     boost::format f("%02d%02d.%04d");
00932 
00933     int degrees = std::floor(lat);
00934     int minutes = std::floor((lat - degrees) * 60);
00935     int ten_thousandth_minutes = std::floor(((lat - degrees) * 60 - minutes) * 10000);
00936 
00937     f % degrees % minutes % ten_thousandth_minutes;
00938     
00939     val_to_mod = f.str();
00940 }
00941 
00942 
00943     
00944 void CpAcommsHandler::alg_lon2nmea_lon(goby::acomms::DCCLMessageVal& val_to_mod)
00945 {
00946     double lon = val_to_mod;
00947 
00948     // DDDMM.MM
00949     boost::format f("%03d%02d.%04d");
00950 
00951     int degrees = std::floor(lon);
00952     int minutes = std::floor((lon - degrees) * 60);
00953     int ten_thousandth_minutes = std::floor(((lon - degrees) * 60 - minutes) * 10000);
00954 
00955     f % degrees % minutes % ten_thousandth_minutes;
00956 
00957     val_to_mod = f.str();
00958 }
00959 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends