Note: Goby version 1 (shown here) is now considered obsolete. Please use version 2 for new projects, and consider upgrading old projects.

Goby Underwater Autonomy Project  Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
acomms/libqueue/queue_manager.cpp
00001 // copyright 2009 t. schneider tes@mit.edu 
00002 //
00003 // this file is part of the Queue Library (libqueue),
00004 // the goby-acomms message queue manager. goby-acomms is a collection of 
00005 // libraries for acoustic underwater networking
00006 //
00007 // This program is free software: you can redistribute it and/or modify
00008 // it under the terms of the GNU General Public License as published by
00009 // the Free Software Foundation, either version 3 of the License, or
00010 // (at your option) any later version.
00011 //
00012 // This software is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this software.  If not, see <http://www.gnu.org/licenses/>.
00019 
00020 #include <boost/foreach.hpp>
00021 
00022 #include "goby/acomms/xml/xml_parser.h"
00023 #include "goby/util/logger.h"
00024 #include "goby/util/time.h"
00025 #include "goby/util/binary.h"
00026 
00027 #include "goby/acomms/xml/xml_parser.h"
00028 #include "queue_manager.h"
00029 #include "queue_constants.h"
00030 #include "queue_xml_callbacks.h"
00031 
00032 int goby::acomms::QueueManager::modem_id_ = 0;
00033 
00034 goby::acomms::QueueManager::QueueManager(std::ostream* log /* =0 */)
00035     : log_(log),
00036       packet_ack_(0),
00037       packet_dest_(BROADCAST_ID)
00038 {}
00039 
00040 void goby::acomms::QueueManager::add_queue(const protobuf::QueueConfig& cfg)
00041 {
00042     Queue q(cfg, log_, modem_id_);
00043     
00044     if(queues_.count(cfg.key()))
00045     {
00046         std::stringstream ss;
00047         ss << "Queue: duplicate key specified for key: " << cfg.key();
00048         throw queue_exception(ss.str());
00049     }
00050     else if((q.cfg().key().id() > MAX_ID || q.cfg().key().id() < MIN_ID) && q.cfg().key().type() == protobuf::QUEUE_DCCL)
00051     {
00052         std::stringstream ss;
00053         ss << "Queue: key (" << cfg.key() << ") is out of bounds for use with libqueue. Use a id from " << MIN_ID << " to " << MAX_ID;
00054         throw queue_exception(ss.str());
00055     }
00056     else
00057     {
00058         std::pair<std::map<goby::acomms::protobuf::QueueKey, Queue>::iterator,bool> new_q_pair =
00059             queues_.insert(std::make_pair(cfg.key(), q));
00060 
00061         qsize(&((new_q_pair.first)->second));
00062     }
00063 
00064     
00065     
00066     if(log_) *log_<< group("q_out") << "added new queue: \n" << q << std::endl;
00067     
00068 }
00069 
00070 std::set<unsigned> goby::acomms::QueueManager::add_xml_queue_file(const std::string& xml_file)
00071 {
00072     std::vector<protobuf::QueueConfig> cfgs;
00073 
00074     // Register handlers for XML parsing
00075     QueueContentHandler content(cfgs);
00076     QueueErrorHandler error;
00077     // instantiate a parser for the xml message files
00078     XMLParser parser(content, error);
00079     std::set<unsigned> added_ids;
00080     
00081     parser.parse(xml_file);
00082     BOOST_FOREACH(const protobuf::QueueConfig& c, cfgs)
00083     {
00084         add_queue(c);
00085         added_ids.insert(c.key().id());
00086     }
00087     
00088     return added_ids;
00089 }
00090 
00091 void goby::acomms::QueueManager::do_work()
00092 {
00093     typedef std::pair<const protobuf::QueueKey, Queue> P;
00094     for(std::map<protobuf::QueueKey, Queue>::iterator it = queues_.begin(), n = queues_.end(); it != n; ++it)
00095     {
00096         std::vector<protobuf::ModemDataTransmission> expired_msgs = it->second.expire();
00097 
00098         protobuf::ModemDataExpire expire;
00099         protobuf::ModemDataTransmission* data_msg = expire.mutable_orig_msg();
00100         BOOST_FOREACH(*data_msg, expired_msgs)
00101         {
00102             //expire.mutable_orig_msg()->mutable_queue_key()->CopyFrom(it->first);
00103             signal_expire(expire);
00104         }
00105         
00106     }
00107     
00108 }
00109 
00110 void goby::acomms::QueueManager::push_message(const protobuf::ModemDataTransmission& data_msg)
00111 {
00112     // message is to us, auto-loopback
00113     if(data_msg.base().dest() == modem_id_)
00114     {
00115         if(log_) *log_<< group("q_out") << "outgoing message is for us: using loopback, not physical interface" << std::endl;
00116         
00117         handle_modem_receive(data_msg);
00118     }
00119 // we have a queue with this key, so push message for sending
00120     else if(queues_.count(data_msg.queue_key()))
00121     {
00122         if(data_msg.queue_key().type() == protobuf::QUEUE_DCCL && manip_manager_.has(data_msg.queue_key().id(), protobuf::MessageFile::NO_QUEUE))
00123         {
00124             if(log_) *log_ << group("q_out") << "not queuing DCCL ID: " << data_msg.queue_key().id() << "; NO_QUEUE manipulator is set" << std::endl;
00125         }
00126         else
00127         {
00128             queues_[data_msg.queue_key()].push_message(data_msg);
00129             qsize(&queues_[data_msg.queue_key()]);
00130         }        
00131 
00132         if(data_msg.queue_key().type() == protobuf::QUEUE_DCCL && manip_manager_.has(data_msg.queue_key().id(), protobuf::MessageFile::LOOPBACK))
00133         {
00134             if(log_) *log_ << group("q_out") << data_msg.queue_key() << " LOOPBACK manipulator set, sending back to decoder" << std::endl;
00135             handle_modem_receive(data_msg);
00136         }        
00137     }
00138     else
00139     {
00140         std::stringstream ss;
00141         ss << "no queue for key: " << data_msg.queue_key();
00142         throw queue_exception(ss.str());
00143     }
00144 }
00145 
00146 
00147 void goby::acomms::QueueManager::flush_queue(const protobuf::QueueFlush& flush)
00148 {
00149     std::map<goby::acomms::protobuf::QueueKey, Queue>::iterator it = queues_.find(flush.key());
00150     
00151     if(it != queues_.end())
00152     {
00153         it->second.flush();
00154         if(log_) *log_ << group("q_out") <<  " flushed queue: " << flush << std::endl;
00155         qsize(&it->second);
00156     }    
00157     else
00158     {
00159         if(log_) *log_ << group("q_out") << warn << " cannot find queue to flush: " << flush << std::endl;
00160     }
00161 }
00162 
00163 
00164 std::string goby::acomms::QueueManager::summary() const
00165 {
00166     std::string s;
00167     typedef std::pair<const protobuf::QueueKey, Queue> P;
00168     BOOST_FOREACH(const P& p, queues_)
00169         s += p.second.summary();
00170 
00171     return s;
00172 }
00173 
00174     
00175 std::ostream& goby::acomms::operator<< (std::ostream& out, const QueueManager& d)
00176 {
00177     out << d.summary();
00178     return out;
00179 }
00180 
00181 // finds and publishes outgoing data for the modem driver
00182 // first query every Queue for its priority data using
00183 // priority_values(priority, last_send_time)
00184 // priority_values returns false if that object has no data to give
00185 // (either no data at all, or in blackout interval) 
00186 // thus, from all the priority values that return true, pick the one with the lowest
00187 // priority value, or given a tie, pick the one with the oldest last_send_time
00188 void goby::acomms::QueueManager::handle_modem_data_request(const protobuf::ModemDataRequest& request_msg, protobuf::ModemDataTransmission* data_msg)
00189 {
00190     data_msg->mutable_base()->set_src(modem_id_);
00191     if(request_msg.frame() == 0)
00192     {
00193         clear_packet();
00194         data_msg->mutable_base()->set_dest(request_msg.base().dest());
00195     }
00196     else
00197     {
00198         if(packet_ack_)
00199             data_msg->set_ack_requested(packet_ack_);
00200 
00201         data_msg->mutable_base()->set_dest(packet_dest_);    
00202     }
00203     // first (0th) user-frame
00204     Queue* winning_queue = find_next_sender(request_msg, *data_msg, true);
00205     
00206     // no data at all for this frame ... :(
00207     if(!winning_queue)
00208     {
00209         data_msg->set_ack_requested(packet_ack_);
00210         data_msg->mutable_base()->set_dest(packet_dest_);
00211         if(log_) *log_<< group("q_out") << "no data found. sending blank to firmware" 
00212                       << ": " << *data_msg << std::endl; 
00213     }
00214     else
00215     {
00216         stitch_recursive(request_msg, data_msg, winning_queue);
00217     }
00218 }
00219 
00220 bool goby::acomms::QueueManager::stitch_recursive(const protobuf::ModemDataRequest& request_msg, protobuf::ModemDataTransmission* complete_data_msg, Queue* winning_queue)
00221 {
00222     const unsigned CCL_ID_BYTES = HEAD_CCL_ID_SIZE / BITS_IN_BYTE;
00223     
00224     // new user frame (e.g. 32B)
00225     protobuf::ModemDataTransmission next_data_msg = winning_queue->give_data(request_msg);
00226 
00227     if(log_) *log_<< group("q_out") << "sending data to firmware from: "
00228                   << winning_queue->cfg().name() 
00229                   << ": " << next_data_msg << std::endl;
00230 
00231     if(next_data_msg.queue_key().type() == protobuf::QUEUE_DCCL &&
00232        manip_manager_.has(next_data_msg.queue_key().id(),
00233                           protobuf::MessageFile::LOOPBACK_AS_SENT))
00234     {
00235         if(log_) *log_ << group("q_out") << next_data_msg.queue_key() << " LOOPBACK_AS_SENT manipulator set, sending back to decoder" << std::endl;
00236         handle_modem_receive(next_data_msg);
00237     }
00238 
00239     
00240     //
00241     // ACK
00242     // 
00243     // insert ack if desired
00244     if(next_data_msg.ack_requested())
00245         waiting_for_ack_.insert(std::pair<unsigned, Queue*>(request_msg.frame(), winning_queue));
00246     else
00247     {
00248         winning_queue->pop_message(request_msg.frame());
00249         qsize(winning_queue); // notify change in queue size
00250     }
00251 
00252     // if an ack been set, do not unset these
00253     if (packet_ack_ == false) packet_ack_ = next_data_msg.ack_requested();
00254     
00255 
00256     //
00257     // DEST
00258     // 
00259     // update destination address
00260     if(request_msg.frame() == 0)
00261     {
00262         // discipline the destination of the packet if initially unset
00263         if(complete_data_msg->base().dest() == QUERY_DESTINATION_ID)
00264             complete_data_msg->mutable_base()->set_dest(next_data_msg.base().dest());
00265 
00266         if(packet_dest_ == BROADCAST_ID)
00267             packet_dest_ = complete_data_msg->base().dest();
00268     }
00269     else
00270     {
00271         complete_data_msg->mutable_base()->set_dest(packet_dest_);
00272     }
00273 
00274     //
00275     // DCCL
00276     //
00277     if(winning_queue->cfg().key().type() == protobuf::QUEUE_DCCL)
00278     {   
00279         // e.g. 32B
00280         std::string new_data = next_data_msg.data();
00281         
00282         // insert the size of the next field (e.g. 32B) right after the header
00283         std::string frame_size(USER_FRAME_NEXT_SIZE_BYTES,
00284                                static_cast<char>((next_data_msg.data().size()-DCCL_NUM_HEADER_BYTES)));
00285         new_data.insert(DCCL_NUM_HEADER_BYTES, frame_size);
00286         
00287         // append without the CCL ID (old size + 31B)
00288         *(complete_data_msg->mutable_data()) += new_data.substr(CCL_ID_BYTES);
00289         
00290         bool is_last_user_frame = true;
00291         // if remaining bytes is greater than the DCCL header, we have a chance of adding another user-frame
00292         if((request_msg.max_bytes() - complete_data_msg->data().size()) > DCCL_NUM_HEADER_BYTES
00293            && winning_queue->cfg().key().type() != protobuf::QUEUE_CCL)
00294         {
00295             // fetch the next candidate
00296             winning_queue = find_next_sender(request_msg, *complete_data_msg, false);
00297             if(winning_queue) is_last_user_frame = false;
00298         }
00299         
00300         if(!is_last_user_frame)
00301         {
00302             replace_header(false, complete_data_msg, next_data_msg, new_data);
00303             return stitch_recursive(request_msg, complete_data_msg, winning_queue);
00304         }
00305         else
00306         {
00307             replace_header(true, complete_data_msg, next_data_msg, new_data);
00308             // add the CCL ID back on to the message (e.g. 33B)
00309             complete_data_msg->mutable_data()->insert(0, std::string(1, DCCL_CCL_HEADER));
00310             // remove the size of the next field from the last user-frame (e.g. 32B).
00311             complete_data_msg->mutable_data()->erase(complete_data_msg->data().size()-new_data.size()+DCCL_NUM_HEADER_BYTES, USER_FRAME_NEXT_SIZE_BYTES);
00312             // set the ack to conform to the entire message
00313             complete_data_msg->set_ack_requested(packet_ack_);
00314             
00315             return true;
00316         }
00317     }
00318     //
00319     // CCL
00320     //
00321     else
00322     {
00323         // not DCCL, copy the msg and we are done
00324         complete_data_msg->CopyFrom(next_data_msg);
00325         return true;
00326     }
00327 }
00328 
00329 void goby::acomms::QueueManager::replace_header(bool is_last_user_frame, protobuf::ModemDataTransmission* data_msg, const protobuf::ModemDataTransmission& next_data_msg, const std::string& new_data)
00330 {
00331     // decode the header so that we can modify the flags
00332     DCCLHeaderDecoder head_decoder(new_data);
00333 
00334     // don't put the multimessage flag on the last user-frame
00335     head_decoder[HEAD_MULTIMESSAGE_FLAG] =
00336         (!is_last_user_frame) ? true : false;
00337     // put the broadcast flag on if needed 
00338     head_decoder[HEAD_BROADCAST_FLAG] =
00339         (next_data_msg.base().dest() == BROADCAST_ID) ? true : false;
00340 
00341     // re-encode the header
00342     DCCLHeaderEncoder head_encoder(head_decoder.get());
00343 
00344     // replace the header without the CCL ID
00345     const unsigned CCL_ID_BYTES = HEAD_CCL_ID_SIZE / BITS_IN_BYTE;
00346     data_msg->mutable_data()->replace(data_msg->data().size()-new_data.size()+CCL_ID_BYTES,
00347                                       head_encoder.str().size()-CCL_ID_BYTES,
00348                                       head_encoder.str().substr(CCL_ID_BYTES));
00349 }
00350 
00351 
00352 void goby::acomms::QueueManager::clear_packet()
00353 {
00354     typedef std::pair<unsigned, Queue*> P;
00355     BOOST_FOREACH(const P& p, waiting_for_ack_)
00356         p.second->clear_ack_queue();
00357     
00358     waiting_for_ack_.clear();
00359     
00360     packet_ack_ = false;
00361     packet_dest_ = BROADCAST_ID;
00362 }
00363 
00364 
00365 
00366 goby::acomms::Queue* goby::acomms::QueueManager::find_next_sender(const protobuf::ModemDataRequest& request_msg, const protobuf::ModemDataTransmission& data_msg, bool first_user_frame)
00367 {   
00368 // competition between variable about who gets to send
00369     double winning_priority;
00370     boost::posix_time::ptime winning_last_send_time;
00371 
00372     Queue* winning_queue = 0;
00373     
00374     if(log_) *log_<< group("priority") << "starting priority contest\n"
00375                   << "requesting: " << request_msg << "\n"
00376                   << "have " << data_msg.data().size() << "/" << request_msg.max_bytes() << "B: " << data_msg << std::endl;
00377 
00378     
00379     for(std::map<protobuf::QueueKey, Queue>::iterator it = queues_.begin(), n = queues_.end(); it != n; ++it)
00380     {
00381         Queue& oq = it->second;
00382         
00383         // encode on demand
00384         if(oq.cfg().key().type() == protobuf::QUEUE_DCCL &&
00385            manip_manager_.has(oq.cfg().key().id(), protobuf::MessageFile::ON_DEMAND) &&
00386            (!oq.size() || oq.newest_msg_time() + ON_DEMAND_SKEW < util::goby_time()))
00387         {
00388             protobuf::ModemDataTransmission data_msg;
00389             data_msg.mutable_queue_key()->CopyFrom(it->first);
00390             signal_data_on_demand(request_msg, &data_msg);
00391             push_message(data_msg);
00392         }
00393         
00394         double priority;
00395         boost::posix_time::ptime last_send_time;
00396         if(oq.priority_values(priority, last_send_time, request_msg, data_msg))
00397         {
00398             // no winner, better winner, or equal & older winner
00399             // AND not CCL when not the first user-frame
00400             if((!winning_queue || priority > winning_priority ||
00401                 (priority == winning_priority && last_send_time < winning_last_send_time))
00402                && !(oq.cfg().key().type() == protobuf::QUEUE_CCL && !first_user_frame))
00403             {
00404                 winning_priority = priority;
00405                 winning_last_send_time = last_send_time;
00406                 winning_queue = &oq;
00407             }
00408         }
00409     }
00410 
00411     if(log_) *log_<< group("priority") << "\t"
00412                   << "all other queues have no messages" << std::endl;
00413 
00414     if(winning_queue)
00415     {
00416         if(log_) *log_<< group("priority") << winning_queue->cfg().name()
00417                       << " has highest priority." << std::endl;
00418     }
00419     
00420     return winning_queue;
00421 }    
00422 
00423 
00424 void goby::acomms::QueueManager::handle_modem_ack(const protobuf::ModemDataAck& orig_ack_msg)
00425 {
00426     protobuf::ModemDataAck ack_msg(orig_ack_msg);
00427     
00428     if(ack_msg.base().dest() != modem_id_)
00429     {
00430         if(log_) *log_<< group("q_in") << warn
00431                       << "ignoring ack for modem_id = " << ack_msg.base().dest() << std::endl;
00432         return;
00433     }
00434     else if(!waiting_for_ack_.count(ack_msg.frame()))
00435     {
00436         if(log_) *log_<< group("q_in")
00437                       << "got ack but we were not expecting one" << std::endl;
00438         return;
00439     }
00440     else
00441     {
00442         
00443         // got an ack, let's pop this!
00444         if(log_) *log_<< group("q_in") << "received ack for this id" << std::endl;
00445         
00446         
00447         std::multimap<unsigned, Queue *>::iterator it = waiting_for_ack_.find(ack_msg.frame());
00448         while(it != waiting_for_ack_.end())
00449         {
00450             Queue* oq = it->second;
00451 
00452             protobuf::ModemDataTransmission* removed_msg = ack_msg.mutable_orig_msg();
00453             if(!oq->pop_message_ack(ack_msg.frame(), removed_msg))
00454             {
00455                 if(log_) *log_<< group("q_in") << warn
00456                               << "failed to pop message from "
00457                               << oq->cfg().name() << std::endl;
00458             }
00459             else
00460             {
00461                 qsize(oq);
00462                 //ack_msg.mutable_orig_msg()->mutable_queue_key()->CopyFrom(oq->cfg().key());
00463                 signal_ack(ack_msg);
00464                 
00465             }
00466 
00467             if(log_) *log_<< group("q_in") << ack_msg << std::endl;
00468             
00469             waiting_for_ack_.erase(it);
00470             
00471             it = waiting_for_ack_.find(ack_msg.frame());
00472         }
00473     }
00474     
00475     return;    
00476 }
00477 
00478 
00479 // parses and publishes incoming data
00480 // by matching the variableID field with the variable specified
00481 // in a "receive = " line of the configuration file
00482 void goby::acomms::QueueManager::handle_modem_receive(const protobuf::ModemDataTransmission& m)
00483 {
00484     // copy so we can modify in various ways 
00485     protobuf::ModemDataTransmission message = m;
00486     
00487     if(log_) *log_<< group("q_in") << "received message"
00488                   << ": " << message << std::endl;
00489 
00490     std::string data = message.data();
00491     if(data.size() < DCCL_NUM_HEADER_BYTES)
00492         return;
00493     
00494     DCCLHeaderDecoder head_decoder(data);
00495     int ccl_id = head_decoder[HEAD_CCL_ID];
00496 
00497     // check for queue_dccl type
00498     if(ccl_id == DCCL_CCL_HEADER)
00499     {
00500         unstitch_recursive(&data, &message);
00501     }
00502     // check for ccl type
00503     else
00504     {
00505         protobuf::QueueKey key;
00506         key.set_type(protobuf::QUEUE_CCL);
00507         key.set_id(ccl_id);
00508         
00509         std::map<protobuf::QueueKey, Queue>::iterator it = queues_.find(key);
00510         
00511         if (it != queues_.end())
00512         {
00513             message.mutable_queue_key()->CopyFrom(key);
00514             signal_receive_ccl(message);
00515         }
00516         else
00517         {
00518             if(log_) *log_<< group("q_in") << warn << "incoming data string is not for us (not DCCL or known CCL)." << std::endl;
00519         }
00520     }
00521 }
00522 
00523 bool goby::acomms::QueueManager::unstitch_recursive(std::string* data, protobuf::ModemDataTransmission* data_msg)
00524 {
00525     unsigned original_dest = data_msg->base().dest();
00526     DCCLHeaderDecoder head_decoder(*data);
00527     bool multimessage_flag = head_decoder[HEAD_MULTIMESSAGE_FLAG];
00528     bool broadcast_flag = head_decoder[HEAD_BROADCAST_FLAG];
00529     unsigned dccl_id = head_decoder[HEAD_DCCL_ID];
00530     
00531     // test multimessage bit
00532     if(multimessage_flag)
00533     {
00534         // extract frame_size
00535         // TODO (tes): Make this work properly for strings larger than one byte 
00536         unsigned frame_size = data->substr(DCCL_NUM_HEADER_BYTES, USER_FRAME_NEXT_SIZE_BYTES)[0];
00537         
00538         // erase the frame size byte
00539         data->erase(DCCL_NUM_HEADER_BYTES, USER_FRAME_NEXT_SIZE_BYTES);
00540         
00541         // extract the data for this user-frame
00542         data_msg->set_data(data->substr(0, (frame_size + DCCL_NUM_HEADER_BYTES)));
00543         
00544         data->erase(USER_FRAME_NEXT_SIZE_BYTES,
00545                     (frame_size + DCCL_NUM_HEADER_BYTES-USER_FRAME_NEXT_SIZE_BYTES));
00546     }
00547     else
00548     {
00549         data_msg->set_data(*data);
00550     }
00551     
00552     // reset these flags
00553     head_decoder[HEAD_MULTIMESSAGE_FLAG] = false;
00554     head_decoder[HEAD_BROADCAST_FLAG] = false;
00555     
00556     DCCLHeaderEncoder head_encoder(head_decoder.get());
00557     data_msg->mutable_data()->replace(0, DCCL_NUM_HEADER_BYTES, head_encoder.str());
00558     // overwrite destination as BROADCAST if broadcast bit is set
00559     data_msg->mutable_base()->set_dest(broadcast_flag ? BROADCAST_ID : original_dest);
00560     publish_incoming_piece(data_msg, dccl_id); 
00561     
00562     // put the destination back
00563     data_msg->mutable_base()->set_dest(original_dest);
00564 
00565     // unstitch until the multimessage flag is no longer set
00566     return multimessage_flag ? unstitch_recursive(data, data_msg) : true;
00567 }
00568 
00569 bool goby::acomms::QueueManager::publish_incoming_piece(protobuf::ModemDataTransmission* message, const unsigned incoming_var_id)
00570 {
00571     if(message->base().dest() != BROADCAST_ID && message->base().dest() != modem_id_ &&
00572        !manip_manager_.has(incoming_var_id, protobuf::MessageFile::PROMISCUOUS))
00573     {
00574         if(log_) *log_<< group("q_in") << warn << "ignoring message for modem_id = "
00575                       << message->base().dest() << std::endl;
00576         return false;
00577     }
00578 
00579     protobuf::QueueKey dccl_key;
00580     dccl_key.set_type(protobuf::QUEUE_DCCL);
00581     dccl_key.set_id(incoming_var_id);
00582     
00583     std::map<protobuf::QueueKey, Queue>::iterator it_dccl = queues_.find(dccl_key);
00584     
00585     if(it_dccl == queues_.end())
00586     {
00587         if(log_) *log_<< group("q_in") << warn << "no mapping for this variable ID: "
00588                       << incoming_var_id << std::endl;
00589         return false;
00590     }
00591 
00592     message->mutable_queue_key()->CopyFrom(dccl_key);
00593     signal_receive(*message);    
00594 
00595     return true;
00596 }
00597 
00598 void goby::acomms::QueueManager::add_flex_groups(util::FlexOstream* tout)
00599 {
00600     tout->add_group("push", util::Colors::lt_cyan, "stack push - outgoing messages (goby_queue)");
00601     tout->add_group("pop",  util::Colors::lt_green, "stack pop - outgoing messages (goby_queue)");
00602     tout->add_group("priority",  util::Colors::yellow, "priority contest (goby_queue)");
00603     tout->add_group("q_out",  util::Colors::cyan, "outgoing queuing messages (goby_queue)");
00604     tout->add_group("q_in",  util::Colors::green, "incoming queuing messages (goby_queue)");
00605 }
00606 
00607 
00608 void goby::acomms::QueueManager::set_cfg(const protobuf::QueueManagerConfig& cfg)
00609 {
00610     cfg_ = cfg;
00611     process_cfg();
00612 }
00613 
00614 void goby::acomms::QueueManager::merge_cfg(const protobuf::QueueManagerConfig& cfg)
00615 {
00616     cfg_.MergeFrom(cfg);
00617     process_cfg();
00618 }
00619 
00620 
00621 void goby::acomms::QueueManager::process_cfg()
00622 {
00623     queues_.clear();
00624     waiting_for_ack_.clear();
00625     manip_manager_.clear();
00626     
00627     for(int i = 0, n = cfg_.message_file_size(); i < n; ++i)
00628     {
00629         std::set<unsigned> new_ids = add_xml_queue_file(cfg_.message_file(i).path());
00630         BOOST_FOREACH(unsigned new_id, new_ids)
00631         {
00632             for(int j = 0, o = cfg_.message_file(i).manipulator_size(); j < o; ++j)
00633                 manip_manager_.add(new_id, cfg_.message_file(i).manipulator(j));
00634         }
00635     }
00636     
00637     for(int i = 0, n = cfg_.queue_size(); i < n; ++i)
00638         add_queue(cfg_.queue(i));
00639 
00640     modem_id_ = cfg_.modem_id();
00641 }
00642 
00643 void goby::acomms::QueueManager::qsize(Queue* q)            
00644 {
00645     protobuf::QueueSize size;
00646     size.mutable_key()->CopyFrom(q->cfg().key());
00647     size.set_size(q->size());
00648     signal_queue_size_change(size);
00649 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends