Goby v2
zeromq_service.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #include "goby/common/logger.h" // for glog & manipulators die, warn, group(), etc.
24 #include "goby/util/as.h" // for goby::util::as
25 #include "goby/util/binary.h" // for hex_encode
26 
27 #include "goby/common/exception.h"
28 #include "zeromq_packet.h"
29 #include "zeromq_service.h"
30 
31 using goby::glog;
32 using goby::util::as;
33 using goby::util::hex_encode;
34 using namespace goby::common::logger_lock;
35 using namespace goby::common::logger;
36 
37 #if ZMQ_VERSION_MAJOR == 2
38 #define zmq_msg_send(msg, sock, opt) zmq_send(sock, msg, opt)
39 #define zmq_msg_recv(msg, sock, opt) zmq_recv(sock, msg, opt)
40 #define ZMQ_POLL_DIVISOR 1 // zmq_poll is usec
41 #define more_t int64_t
42 #else
43 #define more_t int
44 #define ZMQ_POLL_DIVISOR 1000 // zmq_poll is msec
45 #endif
46 
47 goby::common::ZeroMQService::ZeroMQService(boost::shared_ptr<zmq::context_t> context)
48  : context_(context)
49 {
50  init();
51 }
52 
53 goby::common::ZeroMQService::ZeroMQService() : context_(new zmq::context_t(2)) { init(); }
54 
55 void goby::common::ZeroMQService::init()
56 {
57  glog.add_group(glog_out_group(), common::Colors::lt_magenta);
58  glog.add_group(glog_in_group(), common::Colors::lt_blue);
59 }
60 
61 void goby::common::ZeroMQService::process_cfg(const protobuf::ZeroMQServiceConfig& cfg)
62 {
63  for (int i = 0, n = cfg.socket_size(); i < n; ++i)
64  {
65  if (!sockets_.count(cfg.socket(i).socket_id()))
66  {
67  // TODO (tes) - check for compatible socket type
68  boost::shared_ptr<zmq::socket_t> new_socket(
69  new zmq::socket_t(*context_, socket_type(cfg.socket(i).socket_type())));
70 
71  sockets_.insert(std::make_pair(cfg.socket(i).socket_id(), ZeroMQSocket(new_socket)));
72 
73  // Initialize poll set
74  zmq::pollitem_t item = {(void*)*new_socket, 0, ZMQ_POLLIN, 0};
75 
76  // publish sockets can't receive
77  if (cfg.socket(i).socket_type() != protobuf::ZeroMQServiceConfig::Socket::PUBLISH)
78  {
79  register_poll_item(item, boost::bind(&goby::common::ZeroMQService::handle_receive,
80  this, _1, _2, _3, cfg.socket(i).socket_id()));
81  }
82  }
83 
84  boost::shared_ptr<zmq::socket_t> this_socket =
85  socket_from_id(cfg.socket(i).socket_id()).socket();
86 
87  if (cfg.socket(i).connect_or_bind() == protobuf::ZeroMQServiceConfig::Socket::CONNECT)
88  {
89  std::string endpoint;
90  switch (cfg.socket(i).transport())
91  {
92  case protobuf::ZeroMQServiceConfig::Socket::INPROC:
93  endpoint = "inproc://" + cfg.socket(i).socket_name();
94  break;
95 
96  case protobuf::ZeroMQServiceConfig::Socket::IPC:
97  endpoint = "ipc://" + cfg.socket(i).socket_name();
98  break;
99 
100  case protobuf::ZeroMQServiceConfig::Socket::TCP:
101  endpoint = "tcp://" + cfg.socket(i).ethernet_address() + ":" +
102  as<std::string>(cfg.socket(i).ethernet_port());
103  break;
104 
105  case protobuf::ZeroMQServiceConfig::Socket::PGM:
106  endpoint = "pgm://" + cfg.socket(i).ethernet_address() + ";" +
107  cfg.socket(i).multicast_address() + ":" +
108  as<std::string>(cfg.socket(i).ethernet_port());
109  break;
110 
111  case protobuf::ZeroMQServiceConfig::Socket::EPGM:
112  endpoint = "epgm://" + cfg.socket(i).ethernet_address() + ";" +
113  cfg.socket(i).multicast_address() + ":" +
114  as<std::string>(cfg.socket(i).ethernet_port());
115  break;
116  }
117 
118  try
119  {
120  this_socket->connect(endpoint.c_str());
121  glog.is(DEBUG1) && glog << group(glog_out_group())
122  << cfg.socket(i).ShortDebugString()
123  << " connected to endpoint - " << endpoint << std::endl;
124  }
125  catch (std::exception& e)
126  {
127  std::stringstream ess;
128  ess << "cannot connect to: " << endpoint << ": " << e.what();
129  throw(goby::Exception(ess.str()));
130  }
131  }
132  else if (cfg.socket(i).connect_or_bind() == protobuf::ZeroMQServiceConfig::Socket::BIND)
133  {
134  std::string endpoint;
135  switch (cfg.socket(i).transport())
136  {
137  case protobuf::ZeroMQServiceConfig::Socket::INPROC:
138  endpoint = "inproc://" + cfg.socket(i).socket_name();
139  break;
140 
141  case protobuf::ZeroMQServiceConfig::Socket::IPC:
142  endpoint = "ipc://" + cfg.socket(i).socket_name();
143  break;
144 
145  case protobuf::ZeroMQServiceConfig::Socket::TCP:
146  endpoint = "tcp://*:" + as<std::string>(cfg.socket(i).ethernet_port());
147  break;
148 
149  case protobuf::ZeroMQServiceConfig::Socket::PGM:
150  throw(goby::Exception("Cannot BIND to PGM socket (use CONNECT)"));
151  break;
152 
153  case protobuf::ZeroMQServiceConfig::Socket::EPGM:
154  throw(goby::Exception("Cannot BIND to EPGM socket (use CONNECT)"));
155  break;
156  }
157 
158  try
159  {
160  this_socket->bind(endpoint.c_str());
161  glog.is(DEBUG1) &&
162  glog << group(glog_out_group()) << "bound to endpoint - " << endpoint
163  << ", Socket: " << cfg.socket(i).ShortDebugString() << std::endl;
164  }
165  catch (std::exception& e)
166  {
167  std::stringstream ess;
168  ess << "cannot bind to: " << endpoint << ": " << e.what();
169  throw(goby::Exception(ess.str()));
170  }
171  }
172  }
173 }
174 
175 goby::common::ZeroMQService::~ZeroMQService()
176 {
177  // std::cout << "ZeroMQService: " << this << ": destroyed" << std::endl;
178  // std::cout << "poll_mutex " << &poll_mutex_ << std::endl;
179 }
180 
181 int goby::common::ZeroMQService::socket_type(protobuf::ZeroMQServiceConfig::Socket::SocketType type)
182 {
183  switch (type)
184  {
185  case protobuf::ZeroMQServiceConfig::Socket::PUBLISH: return ZMQ_PUB;
186  case protobuf::ZeroMQServiceConfig::Socket::SUBSCRIBE: return ZMQ_SUB;
187  case protobuf::ZeroMQServiceConfig::Socket::REPLY: return ZMQ_REP;
188  case protobuf::ZeroMQServiceConfig::Socket::REQUEST:
189  return ZMQ_REQ;
190  // case protobuf::ZeroMQServiceConfig::Socket::ZMQ_PUSH: return ZMQ_PUSH;
191  // case protobuf::ZeroMQServiceConfig::Socket::ZMQ_PULL: return ZMQ_PULL;
192  // case protobuf::ZeroMQServiceConfig::Socket::ZMQ_DEALER: return ZMQ_DEALER;
193  // case protobuf::ZeroMQServiceConfig::Socket::ZMQ_ROUTER: return ZMQ_ROUTER;
194  }
195  throw(goby::Exception("Invalid SocketType"));
196 }
197 
198 goby::common::ZeroMQSocket& goby::common::ZeroMQService::socket_from_id(int socket_id)
199 {
200  std::map<int, ZeroMQSocket>::iterator it = sockets_.find(socket_id);
201  if (it != sockets_.end())
202  return it->second;
203  else
204  throw(goby::Exception("Attempted to access socket_id " + as<std::string>(socket_id) +
205  " which does not exist"));
206 }
207 
208 void goby::common::ZeroMQService::subscribe_all(int socket_id)
209 {
210  socket_from_id(socket_id).socket()->setsockopt(ZMQ_SUBSCRIBE, 0, 0);
211 }
212 
213 void goby::common::ZeroMQService::unsubscribe_all(int socket_id)
214 {
215  socket_from_id(socket_id).socket()->setsockopt(ZMQ_UNSUBSCRIBE, 0, 0);
216 }
217 
218 void goby::common::ZeroMQService::subscribe(MarshallingScheme marshalling_scheme,
219  const std::string& identifier, int socket_id)
220 {
221  pre_subscribe_hooks(marshalling_scheme, identifier, socket_id);
222 
223  std::string zmq_filter = zeromq_packet_make_header(marshalling_scheme, identifier);
224  int NULL_TERMINATOR_SIZE = 1;
225  zmq_filter.resize(zmq_filter.size() - NULL_TERMINATOR_SIZE);
226  socket_from_id(socket_id).socket()->setsockopt(ZMQ_SUBSCRIBE, zmq_filter.c_str(),
227  zmq_filter.size());
228 
229  glog.is(DEBUG1) && glog << group(glog_in_group()) << "subscribed for marshalling "
230  << marshalling_scheme << " with identifier: [" << identifier
231  << "] using zmq_filter: " << goby::util::hex_encode(zmq_filter)
232  << std::endl;
233 
234  post_subscribe_hooks(marshalling_scheme, identifier, socket_id);
235 }
236 
237 void goby::common::ZeroMQService::unsubscribe(MarshallingScheme marshalling_scheme,
238  const std::string& identifier, int socket_id)
239 {
240  std::string zmq_filter = zeromq_packet_make_header(marshalling_scheme, identifier);
241  int NULL_TERMINATOR_SIZE = 1;
242  zmq_filter.resize(zmq_filter.size() - NULL_TERMINATOR_SIZE);
243  socket_from_id(socket_id).socket()->setsockopt(ZMQ_UNSUBSCRIBE, zmq_filter.c_str(),
244  zmq_filter.size());
245 
246  glog.is(DEBUG1) && glog << group(glog_in_group()) << "unsubscribed for marshalling "
247  << marshalling_scheme << " with identifier: [" << identifier
248  << "] using zmq_filter: " << goby::util::hex_encode(zmq_filter)
249  << std::endl;
250 }
251 
252 void goby::common::ZeroMQService::send(MarshallingScheme marshalling_scheme,
253  const std::string& identifier, const std::string& body,
254  int socket_id)
255 {
256  pre_send_hooks(marshalling_scheme, identifier, socket_id);
257 
258  std::string raw;
259  zeromq_packet_encode(&raw, marshalling_scheme, identifier, body);
260 
261  zmq::message_t msg(raw.size());
262  memcpy(msg.data(), raw.c_str(), raw.size()); // insert packet
263 
264  glog.is(DEBUG3) &&
265  glog << group(glog_out_group()) << "Sent message (hex): "
266  << hex_encode(std::string(static_cast<const char*>(msg.data()), msg.size()))
267  << std::endl;
268  socket_from_id(socket_id).socket()->send(msg);
269 
270  post_send_hooks(marshalling_scheme, identifier, socket_id);
271 }
272 
273 void goby::common::ZeroMQService::handle_receive(const void* data, int size, int message_part,
274  int socket_id)
275 {
276  std::string bytes(static_cast<const char*>(data), size);
277 
278  glog.is(DEBUG3) && glog << group(glog_in_group())
279  << "Received message (hex): " << goby::util::hex_encode(bytes)
280  << std::endl;
281 
282  MarshallingScheme marshalling_scheme = MARSHALLING_UNKNOWN;
283  std::string identifier;
284  std::string body;
285 
286  switch (message_part)
287  {
288  case 0:
289  {
290  zeromq_packet_decode(bytes, &marshalling_scheme, &identifier, &body);
291 
292  glog.is(DEBUG3) && glog << group(glog_in_group()) << "Received message of type: ["
293  << identifier << "]" << std::endl;
294 
295  glog.is(DEBUG3) && glog << group(glog_in_group()) << "Body ["
296  << goby::util::hex_encode(body) << "]" << std::endl;
297 
298  if (socket_from_id(socket_id).check_blackout(marshalling_scheme, identifier))
299  {
300  inbox_signal_(marshalling_scheme, identifier, body, socket_id);
301  }
302  }
303  break;
304 
305  default:
306  throw(std::runtime_error(
307  "Got more parts to the message than expecting (expecting only 1)"));
308  break;
309  }
310 }
311 
312 bool goby::common::ZeroMQService::poll(long timeout /* = -1 */)
313 {
314  boost::mutex::scoped_lock slock(poll_mutex_);
315 
316  // glog.is(DEBUG2) && glog << "Have " << poll_items_.size() << " items to poll" << std::endl ;
317  bool had_events = false;
318  zmq::poll(&poll_items_[0], poll_items_.size(), timeout / ZMQ_POLL_DIVISOR);
319  for (int i = 0, n = poll_items_.size(); i < n; ++i)
320  {
321  if (poll_items_[i].revents & ZMQ_POLLIN)
322  {
323  int message_part = 0;
324  more_t more;
325  size_t more_size = sizeof(more_t);
326  do
327  {
328  /* Create an empty ØMQ message to hold the message part */
329  zmq_msg_t part;
330  int rc = zmq_msg_init(&part);
331 
332  if (rc == -1)
333  {
334  glog.is(DEBUG1) && glog << warn << "zmq_msg_init failed" << std::endl;
335  continue;
336  }
337 
338  /* Block until a message is available to be received from socket */
339  rc = zmq_msg_recv(&part, poll_items_[i].socket, 0);
340  glog.is(DEBUG3) && glog << group(glog_in_group()) << "Had event for poll item " << i
341  << std::endl;
342  poll_callbacks_[i](zmq_msg_data(&part), zmq_msg_size(&part), message_part);
343 
344  if (rc == -1)
345  {
346  glog.is(DEBUG1) && glog << warn << "zmq_recv failed" << std::endl;
347  continue;
348  }
349 
350  /* Determine if more message parts are to follow */
351  rc = zmq_getsockopt(poll_items_[i].socket, ZMQ_RCVMORE, &more, &more_size);
352 
353  if (rc == -1)
354  {
355  glog.is(DEBUG1) && glog << warn << "zmq_getsocketopt failed" << std::endl;
356  continue;
357  }
358 
359  zmq_msg_close(&part);
360  ++message_part;
361  } while (more);
362  had_events = true;
363  }
364  }
365  return had_events;
366 }
367 
368 void goby::common::ZeroMQSocket::set_global_blackout(boost::posix_time::time_duration duration)
369 {
370  glog.is(DEBUG2) && glog << group(ZeroMQService::glog_in_group()) << "Global blackout set to "
371  << duration << std::endl;
372  global_blackout_ = duration;
373  global_blackout_set_ = true;
374 }
375 
376 void goby::common::ZeroMQSocket::set_blackout(MarshallingScheme marshalling_scheme,
377  const std::string& identifier,
378  boost::posix_time::time_duration duration)
379 {
380  glog.is(DEBUG2) && glog << group(ZeroMQService::glog_in_group())
381  << "Blackout for marshalling scheme: " << marshalling_scheme
382  << " and identifier " << identifier << " set to " << duration
383  << std::endl;
384  blackout_info_[std::make_pair(marshalling_scheme, identifier)] = BlackoutInfo(duration);
385  local_blackout_set_ = true;
386 }
387 
388 bool goby::common::ZeroMQSocket::check_blackout(MarshallingScheme marshalling_scheme,
389  const std::string& identifier)
390 {
391  if (!local_blackout_set_ && !global_blackout_set_)
392  {
393  return true;
394  }
395  else
396  {
397  boost::posix_time::ptime this_time = goby::common::goby_time();
398 
399  BlackoutInfo& blackout_info =
400  blackout_info_[std::make_pair(marshalling_scheme, identifier)];
401 
402  const boost::posix_time::ptime& last_post_time = blackout_info.last_post_time;
403 
404  if ((local_blackout_set_ && // local blackout
405  this_time - last_post_time > blackout_info.blackout_interval))
406  {
407  blackout_info.last_post_time = this_time;
408  return true;
409  }
410  else if ((global_blackout_set_ && // global blackout
411  this_time - last_post_time > global_blackout_)) // fails global blackout
412  {
413  blackout_info.last_post_time = this_time;
414  return true;
415  }
416  else
417  {
418  glog.is(DEBUG3) && glog << group(ZeroMQService::glog_in_group())
419  << "Message (marshalling scheme: " << marshalling_scheme
420  << ", identifier: " << identifier << ")"
421  << " is in blackout: this time:" << this_time
422  << ", last time: " << last_post_time
423  << ", global blackout: " << global_blackout_
424  << ", local blackout: " << blackout_info.blackout_interval
425  << ", difference last and this: " << this_time - last_post_time
426  << std::endl;
427  return false;
428  }
429  }
430 }
void zeromq_packet_encode(std::string *raw, MarshallingScheme marshalling_scheme, const std::string &identifier, const std::string &body)
Encodes a packet for Goby over ZeroMQ.
Definition: zeromq_packet.h:49
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
Definition: time.h:104
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages...
common::FlexOstream glog
Access the Goby logger through this object.
simple exception class for goby applications
Definition: exception.h:32
void zeromq_packet_decode(const std::string &raw, MarshallingScheme *marshalling_scheme, std::string *identifier, std::string *body)
Decodes a packet for Goby over ZeroMQ.
Definition: zeromq_packet.h:57