23 #ifndef ZEROMQNODE20110413H    24 #define ZEROMQNODE20110413H    26 #include <boost/bind.hpp>    27 #include <boost/function.hpp>    28 #include <boost/signals2.hpp>    29 #include <boost/thread/mutex.hpp>    33 #include "goby/common/protobuf/zero_mq_node_config.pb.h"    37 #include "core_constants.h"    38 #include "goby/common/logger.h"    48         : global_blackout_(boost::posix_time::not_a_date_time), local_blackout_set_(
false),
    49           global_blackout_set_(
false)
    54         : socket_(socket), global_blackout_(boost::posix_time::not_a_date_time),
    55           local_blackout_set_(
false), global_blackout_set_(
false)
    59     void set_global_blackout(boost::posix_time::time_duration duration);
    60     void set_blackout(MarshallingScheme marshalling_scheme, 
const std::string& identifier,
    61                       boost::posix_time::time_duration duration);
    63     void clear_blackout(MarshallingScheme marshalling_scheme, 
const std::string& identifier)
    65         blackout_info_.erase(std::make_pair(marshalling_scheme, identifier));
    66         local_blackout_set_ = 
false;
    68     void clear_global_blackout()
    70         global_blackout_ = boost::posix_time::not_a_date_time;
    71         global_blackout_set_ = 
false;
    76     bool check_blackout(MarshallingScheme marshalling_scheme, 
const std::string& identifier);
    78     void set_socket(boost::shared_ptr<zmq::socket_t> socket) { socket_ = socket; }
    80     boost::shared_ptr<zmq::socket_t>& socket() { 
return socket_; }
    85         BlackoutInfo(boost::posix_time::time_duration interval = boost::posix_time::not_a_date_time)
    86             : blackout_interval(interval), last_post_time(boost::posix_time::neg_infin)
    90         boost::posix_time::time_duration blackout_interval;
    91         boost::posix_time::ptime last_post_time;
    94     boost::shared_ptr<zmq::socket_t> socket_;
    96     boost::posix_time::time_duration global_blackout_;
    97     bool local_blackout_set_;
    98     bool global_blackout_set_;
    99     std::map<std::pair<MarshallingScheme, std::string>, BlackoutInfo> blackout_info_;
   121     void subscribe_all(
int socket_id);
   122     void unsubscribe_all(
int socket_id);
   124     void send(MarshallingScheme marshalling_scheme, 
const std::string& identifier,
   125               const std::string& body, 
int socket_id);
   127     void subscribe(MarshallingScheme marshalling_scheme, 
const std::string& identifier,
   130     void unsubscribe(MarshallingScheme marshalling_scheme, 
const std::string& identifier,
   134     void connect_inbox_slot(
void (C::*mem_func)(MarshallingScheme, 
const std::string&,
   135                                                 const std::string&, 
int),
   138         goby::glog.is(goby::common::logger::DEBUG1) &&
   139             goby::glog << 
"ZeroMQService: made connection for: " << 
typeid(obj).name() << std::endl;
   140         connect_inbox_slot(boost::bind(mem_func, obj, _1, _2, _3, _4));
   143     void connect_inbox_slot(
   144         boost::function<
void(MarshallingScheme marshalling_scheme, 
const std::string& identifier,
   145                              const std::string& body, 
int socket_id)>
   148         inbox_signal_.connect(slot);
   151     bool poll(
long timeout = -1);
   156         poll_callbacks_.clear();
   162     void register_poll_item(
const zmq::pollitem_t& item, 
void (C::*mem_func)(
const void*, 
int, 
int),
   165         register_poll_item(item, boost::bind(mem_func, obj, _1, _2, _3));
   169     register_poll_item(
const zmq::pollitem_t& item,
   170                        boost::function<
void(
const void* data, 
int size, 
int message_part)> callback)
   172         poll_items_.push_back(item);
   173         poll_callbacks_.insert(std::make_pair(poll_items_.size() - 1, callback));
   176     boost::shared_ptr<zmq::context_t> zmq_context() { 
return context_; }
   178     boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
   179                                  const std::string& identifier, 
int socket_id)>
   182     boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
   183                                  const std::string& identifier, 
int socket_id)>
   186     boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
   187                                  const std::string& identifier, 
int socket_id)>
   190     boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
   191                                  const std::string& identifier, 
int socket_id)>
   192         post_subscribe_hooks;
   194     static std::string glog_out_group() { 
return "goby::common::zmq::out"; }
   195     static std::string glog_in_group() { 
return "goby::common::zmq::in"; }
   207     void handle_receive(
const void* data, 
int size, 
int message_part, 
int socket_id);
   209     int socket_type(protobuf::ZeroMQServiceConfig::Socket::SocketType type);
   212     boost::shared_ptr<zmq::context_t> context_;
   213     std::map<int, ZeroMQSocket> sockets_;
   214     std::vector<zmq::pollitem_t> poll_items_;
   219     std::map<size_t, boost::function<void(const void* data, int size, int message_part)> >
   222     boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
   223                                  const std::string& identifier, 
const std::string& body,
   226     boost::mutex poll_mutex_;
 
common::FlexOstream glog
Access the Goby logger through this object. 
 
The global namespace for the Goby project.