Goby v2
zeromq_service.h
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #ifndef ZEROMQNODE20110413H
24 #define ZEROMQNODE20110413H
25 
26 #include <boost/bind.hpp>
27 #include <boost/function.hpp>
28 #include <boost/signals2.hpp>
29 #include <boost/thread/mutex.hpp>
30 #include <iostream>
31 #include <string>
32 
33 #include "goby/common/protobuf/zero_mq_node_config.pb.h"
34 
35 #include <zmq.hpp>
36 
37 #include "core_constants.h"
38 #include "goby/common/logger.h"
39 
40 namespace goby
41 {
42 namespace common
43 {
45 {
46  public:
47  ZeroMQSocket()
48  : global_blackout_(boost::posix_time::not_a_date_time), local_blackout_set_(false),
49  global_blackout_set_(false)
50  {
51  }
52 
53  ZeroMQSocket(boost::shared_ptr<zmq::socket_t> socket)
54  : socket_(socket), global_blackout_(boost::posix_time::not_a_date_time),
55  local_blackout_set_(false), global_blackout_set_(false)
56  {
57  }
58 
59  void set_global_blackout(boost::posix_time::time_duration duration);
60  void set_blackout(MarshallingScheme marshalling_scheme, const std::string& identifier,
61  boost::posix_time::time_duration duration);
62 
63  void clear_blackout(MarshallingScheme marshalling_scheme, const std::string& identifier)
64  {
65  blackout_info_.erase(std::make_pair(marshalling_scheme, identifier));
66  local_blackout_set_ = false;
67  }
68  void clear_global_blackout()
69  {
70  global_blackout_ = boost::posix_time::not_a_date_time;
71  global_blackout_set_ = false;
72  }
73 
74  // true means go ahead and post this message
75  // false means in blackout
76  bool check_blackout(MarshallingScheme marshalling_scheme, const std::string& identifier);
77 
78  void set_socket(boost::shared_ptr<zmq::socket_t> socket) { socket_ = socket; }
79 
80  boost::shared_ptr<zmq::socket_t>& socket() { return socket_; }
81 
82  private:
83  struct BlackoutInfo
84  {
85  BlackoutInfo(boost::posix_time::time_duration interval = boost::posix_time::not_a_date_time)
86  : blackout_interval(interval), last_post_time(boost::posix_time::neg_infin)
87  {
88  }
89 
90  boost::posix_time::time_duration blackout_interval;
91  boost::posix_time::ptime last_post_time;
92  };
93 
94  boost::shared_ptr<zmq::socket_t> socket_;
95 
96  boost::posix_time::time_duration global_blackout_;
97  bool local_blackout_set_;
98  bool global_blackout_set_;
99  std::map<std::pair<MarshallingScheme, std::string>, BlackoutInfo> blackout_info_;
100 };
101 
103 {
104  public:
105  ZeroMQService();
106  ZeroMQService(boost::shared_ptr<zmq::context_t> context);
107  virtual ~ZeroMQService();
108 
109  void set_cfg(const protobuf::ZeroMQServiceConfig& cfg)
110  {
111  process_cfg(cfg);
112  cfg_.CopyFrom(cfg);
113  }
114 
115  void merge_cfg(const protobuf::ZeroMQServiceConfig& cfg)
116  {
117  process_cfg(cfg);
118  cfg_.MergeFrom(cfg);
119  }
120 
121  void subscribe_all(int socket_id);
122  void unsubscribe_all(int socket_id);
123 
124  void send(MarshallingScheme marshalling_scheme, const std::string& identifier,
125  const std::string& body, int socket_id);
126 
127  void subscribe(MarshallingScheme marshalling_scheme, const std::string& identifier,
128  int socket_id);
129 
130  void unsubscribe(MarshallingScheme marshalling_scheme, const std::string& identifier,
131  int socket_id);
132 
133  template <class C>
134  void connect_inbox_slot(void (C::*mem_func)(MarshallingScheme, const std::string&,
135  const std::string&, int),
136  C* obj)
137  {
138  goby::glog.is(goby::common::logger::DEBUG1) &&
139  goby::glog << "ZeroMQService: made connection for: " << typeid(obj).name() << std::endl;
140  connect_inbox_slot(boost::bind(mem_func, obj, _1, _2, _3, _4));
141  }
142 
143  void connect_inbox_slot(
144  boost::function<void(MarshallingScheme marshalling_scheme, const std::string& identifier,
145  const std::string& body, int socket_id)>
146  slot)
147  {
148  inbox_signal_.connect(slot);
149  }
150 
151  bool poll(long timeout = -1);
152  void close_all()
153  {
154  sockets_.clear();
155  poll_items_.clear();
156  poll_callbacks_.clear();
157  }
158 
159  ZeroMQSocket& socket_from_id(int socket_id);
160 
161  template <class C>
162  void register_poll_item(const zmq::pollitem_t& item, void (C::*mem_func)(const void*, int, int),
163  C* obj)
164  {
165  register_poll_item(item, boost::bind(mem_func, obj, _1, _2, _3));
166  }
167 
168  void
169  register_poll_item(const zmq::pollitem_t& item,
170  boost::function<void(const void* data, int size, int message_part)> callback)
171  {
172  poll_items_.push_back(item);
173  poll_callbacks_.insert(std::make_pair(poll_items_.size() - 1, callback));
174  }
175 
176  boost::shared_ptr<zmq::context_t> zmq_context() { return context_; }
177 
178  boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
179  const std::string& identifier, int socket_id)>
180  pre_send_hooks;
181 
182  boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
183  const std::string& identifier, int socket_id)>
184  pre_subscribe_hooks;
185 
186  boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
187  const std::string& identifier, int socket_id)>
188  post_send_hooks;
189 
190  boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
191  const std::string& identifier, int socket_id)>
192  post_subscribe_hooks;
193 
194  static std::string glog_out_group() { return "goby::common::zmq::out"; }
195  static std::string glog_in_group() { return "goby::common::zmq::in"; }
196 
197  friend class ZeroMQSocket;
198 
199  private:
201  ZeroMQService& operator=(const ZeroMQService&);
202 
203  void init();
204 
205  void process_cfg(const protobuf::ZeroMQServiceConfig& cfg);
206 
207  void handle_receive(const void* data, int size, int message_part, int socket_id);
208 
209  int socket_type(protobuf::ZeroMQServiceConfig::Socket::SocketType type);
210 
211  private:
212  boost::shared_ptr<zmq::context_t> context_;
213  std::map<int, ZeroMQSocket> sockets_;
214  std::vector<zmq::pollitem_t> poll_items_;
215 
217 
218  // maps poll_items_ index to a callback function
219  std::map<size_t, boost::function<void(const void* data, int size, int message_part)> >
220  poll_callbacks_;
221 
222  boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
223  const std::string& identifier, const std::string& body,
224  int socket_id)>
225  inbox_signal_;
226  boost::mutex poll_mutex_;
227 };
228 } // namespace common
229 } // namespace goby
230 
231 #endif
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.