Goby v2
pubsub_node_wrapper.h
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #ifndef PUBSUBNODE20110506H
24 #define PUBSUBNODE20110506H
25 
26 #include <google/protobuf/message.h>
27 
28 #include "goby/common/node_interface.h"
29 #include "goby/common/protobuf/pubsub_node_config.pb.h"
30 
31 namespace goby
32 {
33 namespace common
34 {
36 {
37  public:
39  : zeromq_service_(*service)
40  {
41  set_cfg(cfg);
42  }
43 
44  virtual ~PubSubNodeWrapperBase() {}
45 
46  void publish(MarshallingScheme marshalling_scheme, const std::string& identifier,
47  const std::string& body)
48  {
49  if (!using_pubsub())
50  {
51  glog.is(goby::common::logger::WARN) &&
52  glog << "Ignoring publish since we have `using_pubsub`=false" << std::endl;
53  return;
54  }
55 
56  zeromq_service_.send(marshalling_scheme, identifier, body, SOCKET_PUBLISH);
57  }
58 
59  void subscribe(MarshallingScheme marshalling_scheme, const std::string& identifier)
60  {
61  if (!using_pubsub())
62  {
63  glog.is(goby::common::logger::WARN) &&
64  glog << "Ignoring subscribe since we have `using_pubsub`=false" << std::endl;
65  return;
66  }
67 
68  zeromq_service_.subscribe(marshalling_scheme, identifier, SOCKET_SUBSCRIBE);
69  }
70 
71  void subscribe_all()
72  {
73  if (!using_pubsub())
74  {
75  glog.is(goby::common::logger::WARN) &&
76  glog << "Ignoring subscribe since we have `using_pubsub`=false" << std::endl;
77  return;
78  }
79 
80  zeromq_service_.subscribe_all(SOCKET_SUBSCRIBE);
81  }
82 
83  bool using_pubsub() const { return cfg_.has_publish_socket() && cfg_.has_subscribe_socket(); }
84 
85  enum
86  {
87  SOCKET_SUBSCRIBE = 103998,
88  SOCKET_PUBLISH = 103999
89  };
90 
91  protected:
92  const protobuf::PubSubSocketConfig& cfg() const { return cfg_; }
93 
94  private:
95  void set_cfg(const protobuf::PubSubSocketConfig& cfg)
96  {
97  cfg_ = cfg;
98 
100 
101  using goby::glog;
102  if (using_pubsub())
103  {
104  glog.is(goby::common::logger::DEBUG1) && glog << "Using publish / subscribe."
105  << std::endl;
106 
108  pubsub_cfg.add_socket();
109  subscriber_socket->CopyFrom(cfg_.subscribe_socket());
110  subscriber_socket->set_socket_type(
111  goby::common::protobuf::ZeroMQServiceConfig::Socket::SUBSCRIBE);
112  subscriber_socket->set_socket_id(SOCKET_SUBSCRIBE);
113 
114  glog.is(goby::common::logger::DEBUG1) &&
115  glog << "Subscriber socket: " << subscriber_socket->DebugString() << std::endl;
116 
118  pubsub_cfg.add_socket();
119  publisher_socket->CopyFrom(cfg_.publish_socket());
120  publisher_socket->set_socket_type(
121  goby::common::protobuf::ZeroMQServiceConfig::Socket::PUBLISH);
122  publisher_socket->set_socket_id(SOCKET_PUBLISH);
123 
124  glog.is(goby::common::logger::DEBUG1) &&
125  glog << "Publisher socket: " << publisher_socket->DebugString() << std::endl;
126  }
127  else
128  {
129  glog.is(goby::common::logger::DEBUG1) &&
130  glog << "Not using publish / subscribe. Set publish_socket and subscribe_socket to "
131  "enable publish / subscribe."
132  << std::endl;
133  }
134 
135  zeromq_service_.merge_cfg(pubsub_cfg);
136  }
137 
138  private:
139  ZeroMQService& zeromq_service_;
141 };
142 
143 template <typename NodeTypeBase> class PubSubNodeWrapper : public PubSubNodeWrapperBase
144 {
145  public:
147  : PubSubNodeWrapperBase(node->zeromq_service(), cfg), node_(*node)
148  {
149  }
150 
151  virtual ~PubSubNodeWrapper() {}
152 
154 
155 
159  void publish(const NodeTypeBase& msg, const std::string& group = "")
160  {
161  if (!using_pubsub())
162  {
163  glog.is(goby::common::logger::WARN) &&
164  glog << "Ignoring publish since we have `using_pubsub`=false" << std::endl;
165  return;
166  }
167 
168  node_.send(msg, SOCKET_PUBLISH, group);
169  }
170 
171  void subscribe(const std::string& identifier)
172  {
173  if (!using_pubsub())
174  {
175  glog.is(goby::common::logger::WARN) &&
176  glog << "Ignoring subscribe since we have `using_pubsub`=false" << std::endl;
177  return;
178  }
179 
180  node_.subscribe(identifier, SOCKET_SUBSCRIBE);
181  }
182 
183  protected:
184  private:
186 };
187 
188 } // namespace common
189 } // namespace goby
190 
191 #endif
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
void publish(const NodeTypeBase &msg, const std::string &group="")
Publish a message (of any type derived from google::protobuf::Message)