Goby v2
moos_gateway.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include "goby/common/logger.h"
23 #include "goby/common/logger/term_color.h"
24 #include "goby/common/pubsub_node_wrapper.h"
25 #include "goby/common/zeromq_application_base.h"
26 #include "goby/moos/moos_node.h"
28 #include "goby/pb/protobuf_node.h"
29 #include "goby/pb/protobuf_pubsub_node_wrapper.h"
30 
31 #include "moos_gateway_config.pb.h"
32 
33 using namespace goby::common::logger;
34 
35 bool MOOSGateway_OnConnect(void* pParam);
36 bool MOOSGateway_OnDisconnect(void* pParam);
37 
38 namespace goby
39 {
40 namespace moos
41 {
43  public MOOSNode,
45 {
46  public:
48  ~MOOSGateway();
49 
50  friend bool ::MOOSGateway_OnConnect(void* pParam);
51  friend bool ::MOOSGateway_OnDisconnect(void* pParam);
52 
53  private:
54  void moos_inbox(CMOOSMsg& msg);
55  void loop();
56 
57  void check_for_new_moos_variables();
58  bool clears_subscribe_filters(const std::string& moos_variable);
59 
60  void pb_inbox(boost::shared_ptr<google::protobuf::Message> msg, const std::string& group);
61 
62  private:
63  static goby::common::ZeroMQService zeromq_service_;
65 
66  goby::common::PubSubNodeWrapper<CMOOSMsg> goby_moos_pubsub_client_;
67  CMOOSCommClient moos_client_;
68  goby::pb::DynamicProtobufPubSubNodeWrapper goby_pb_pubsub_client_;
69 
70  enum
71  {
72  MAX_CONNECTION_TIMEOUT = 10
73  };
74 
75  std::set<std::string> subscribed_vars_;
76 
77  // MOOS Var -> PB Group
78  std::multimap<std::string, std::string> moos2pb_;
79 
80  // PB Group -> MOOS Var
81  std::multimap<std::string, std::string> pb2moos_;
82 
83  bool moos_resubscribe_required_;
84 };
85 } // namespace moos
86 } // namespace goby
87 
88 goby::common::ZeroMQService goby::moos::MOOSGateway::zeromq_service_;
89 
90 int main(int argc, char* argv[])
91 {
93  goby::run<goby::moos::MOOSGateway>(argc, argv, &cfg);
94 }
95 
96 using goby::glog;
97 
98 goby::moos::MOOSGateway::MOOSGateway(protobuf::MOOSGatewayConfig* cfg)
99  : ZeroMQApplicationBase(&zeromq_service_, cfg),
100  MOOSNode(&zeromq_service_), goby::pb::DynamicProtobufNode(&zeromq_service_), cfg_(*cfg),
101  goby_moos_pubsub_client_(this, cfg_.pb_convert()
103  : cfg_.base().pubsub_config()),
104  goby_pb_pubsub_client_(this, cfg_.pb_convert()
105  ? cfg_.base().pubsub_config()
107  moos_resubscribe_required_(false)
108 {
109  goby::util::DynamicProtobufManager::enable_compilation();
110 
111  // load all shared libraries
112  for (int i = 0, n = cfg_.load_shared_library_size(); i < n; ++i)
113  {
114  glog.is(VERBOSE) && glog << "Loading shared library: " << cfg_.load_shared_library(i)
115  << std::endl;
116 
117  void* handle = dlopen(cfg_.load_shared_library(i).c_str(), RTLD_LAZY);
118  if (!handle)
119  {
120  glog << die << "Failed ... check path provided or add to /etc/ld.so.conf "
121  << "or LD_LIBRARY_PATH" << std::endl;
122  }
123  }
124 
125  // load all .proto files
126  for (int i = 0, n = cfg_.load_proto_file_size(); i < n; ++i)
127  {
128  glog.is(VERBOSE) && glog << "Loading protobuf file: " << cfg_.load_proto_file(i)
129  << std::endl;
130 
131  if (!goby::util::DynamicProtobufManager::find_descriptor(cfg_.load_proto_file(i)))
132  glog.is(DIE) && glog << "Failed to load file." << std::endl;
133  }
134 
135  moos_client_.SetOnConnectCallBack(MOOSGateway_OnConnect, this);
136  moos_client_.SetOnDisconnectCallBack(MOOSGateway_OnDisconnect, this);
137  moos_client_.Run(cfg_.moos_server_host().c_str(), cfg_.moos_server_port(),
138  cfg_.base().app_name().c_str(), cfg_.moos_comm_tick());
139 
140  glog.is(VERBOSE) && glog << "Waiting to connect to MOOSDB ... " << std::endl;
141 
142  int i = 0;
143  while (!moos_client_.IsConnected())
144  {
145  i++;
146  if (i > MAX_CONNECTION_TIMEOUT)
147  {
148  glog.is(DIE) && glog << "Failed to connect to MOOSDB in " << MAX_CONNECTION_TIMEOUT
149  << " seconds. Check `moos_server_host` and `moos_server_port`"
150  << std::endl;
151  }
152 
153  sleep(1);
154  }
155 
156  for (int i = 0, n = cfg_.goby_subscribe_filter_size(); i < n; ++i)
157  {
158  if (!cfg_.pb_convert())
159  goby_moos_pubsub_client_.subscribe(cfg_.goby_subscribe_filter(i));
160  else
161  goby_pb_pubsub_client_.subscribe(cfg_.goby_subscribe_filter(i));
162  }
163 
164  for (int i = 0, n = cfg_.pb_pair_size(); i < n; ++i)
165  {
166  if (cfg_.pb_pair(i).direction() ==
167  protobuf::MOOSGatewayConfig::ProtobufMOOSBridgePair::PB_TO_MOOS)
168  {
169  pb2moos_.insert(std::make_pair(cfg_.pb_pair(i).pb_group(), cfg_.pb_pair(i).moos_var()));
170  goby::pb::DynamicProtobufNode::subscribe(
171  goby::common::PubSubNodeWrapperBase::SOCKET_SUBSCRIBE,
172  boost::bind(&MOOSGateway::pb_inbox, this, _1, cfg_.pb_pair(i).pb_group()),
173  cfg_.pb_pair(i).pb_group());
174  }
175  else if (cfg_.pb_pair(i).direction() ==
176  protobuf::MOOSGatewayConfig::ProtobufMOOSBridgePair::MOOS_TO_PB)
177  {
178  moos2pb_.insert(std::make_pair(cfg_.pb_pair(i).moos_var(), cfg_.pb_pair(i).pb_group()));
179  if (!subscribed_vars_.count(cfg_.pb_pair(i).moos_var()))
180  {
181  moos_client_.Register(cfg_.pb_pair(i).moos_var(), 0);
182  subscribed_vars_.insert(cfg_.pb_pair(i).moos_var());
183  }
184  }
185  }
186 
187  glog.add_group("from_moos", common::Colors::lt_magenta, "MOOS -> Goby");
188  glog.add_group("to_moos", common::Colors::lt_green, "Goby -> MOOS");
189 }
190 
191 goby::moos::MOOSGateway::~MOOSGateway() {}
192 
193 void goby::moos::MOOSGateway::moos_inbox(CMOOSMsg& msg)
194 {
195  // we wrote this, so ignore
196  if (msg.GetSourceAux().find(application_name()) != std::string::npos)
197  return;
198 
199  // identify us as the writer
200  msg.SetSourceAux(msg.GetSourceAux() + (msg.GetSourceAux().size() ? "/" : "") +
201  application_name());
202 
203  glog.is(VERBOSE) && glog << group("to_moos") << msg << std::endl;
204 
205  moos_client_.Post(msg);
206 }
207 
208 // from MinimalApplicationBase
209 void goby::moos::MOOSGateway::loop()
210 {
211  check_for_new_moos_variables();
212  std::list<CMOOSMsg> moos_msgs;
213 
214  moos_client_.Fetch(moos_msgs);
215 
216  BOOST_FOREACH (CMOOSMsg& msg, moos_msgs)
217  {
218  // we wrote this, so ignore
219  if (msg.GetSourceAux().find(application_name()) != std::string::npos)
220  continue;
221 
222  msg.SetSourceAux(msg.GetSourceAux() + (msg.GetSourceAux().size() ? "/" : "") +
223  application_name());
224 
225  glog.is(VERBOSE) && glog << group("from_moos") << msg << std::endl;
226 
227  if (!cfg_.pb_convert())
228  goby_moos_pubsub_client_.publish(msg);
229 
230  if (cfg_.pb_convert() && moos2pb_.count(msg.GetKey()))
231  {
232  boost::shared_ptr<google::protobuf::Message> pbmsg =
233  dynamic_parse_for_moos(msg.GetString());
234  if (pbmsg)
235  {
236  typedef std::multimap<std::string, std::string>::iterator It;
237  std::pair<It, It> it_range = moos2pb_.equal_range(msg.GetKey());
238  for (It it = it_range.first; it != it_range.second; ++it)
239  goby_pb_pubsub_client_.publish(*pbmsg, it->second);
240  }
241  }
242  }
243 }
244 
245 // adapted from CMOOSLogger::HandleWildCardLogging
246 // as MOOS has no "accepted" way of handling wild card subscriptions
247 void goby::moos::MOOSGateway::check_for_new_moos_variables()
248 {
249  const double DEFAULT_WILDCARD_TIME = 1.0;
250  static double dfLastWildCardTime = -1.0;
251  if (MOOSTime() - dfLastWildCardTime > DEFAULT_WILDCARD_TIME)
252  {
253  MOOSMSG_LIST InMail;
254  if (moos_client_.ServerRequest("VAR_SUMMARY", InMail, 2.0, false))
255  {
256  if (InMail.size() != 1)
257  {
258  glog.is(WARN) && glog << "ServerRequest for VAR_SUMMARY returned incorrect mail "
259  "size (should be one)";
260  return;
261  }
262 
263  std::vector<std::string> all_var;
264  std::string var_str(InMail.begin()->GetString());
265  boost::split(all_var, var_str, boost::is_any_of(","));
266 
267  BOOST_FOREACH (const std::string& s, all_var)
268  {
269  if (!subscribed_vars_.count(s))
270  {
271  if (clears_subscribe_filters(s))
272  {
273  glog.is(VERBOSE) && glog << "moos_client_.Register for " << s << std::endl;
274  moos_client_.Register(s, 0);
275  subscribed_vars_.insert(s);
276  }
277  }
278  }
279 
280  dfLastWildCardTime = MOOSTime();
281  }
282  }
283 }
284 
285 bool goby::moos::MOOSGateway::clears_subscribe_filters(const std::string& moos_variable)
286 {
287  for (int i = 0, n = cfg_.moos_subscribe_filter_size(); i < n; ++i)
288  {
289  if (moos_variable.compare(0, cfg_.moos_subscribe_filter(i).size(),
290  cfg_.moos_subscribe_filter(i)) == 0)
291  return true;
292  }
293  return false;
294 }
295 
296 void goby::moos::MOOSGateway::pb_inbox(boost::shared_ptr<google::protobuf::Message> msg,
297  const std::string& group)
298 {
299  glog.is(DEBUG2) && glog << "PB --> MOOS: Group: " << group
300  << ", msg type: " << msg->GetDescriptor()->full_name() << std::endl;
301 
302  std::string serialized;
303  serialize_for_moos(&serialized, *msg);
304 
305  typedef std::multimap<std::string, std::string>::iterator It;
306  std::pair<It, It> it_range = pb2moos_.equal_range(group);
307 
308  for (It it = it_range.first; it != it_range.second; ++it)
309  moos_client_.Notify(it->second, serialized);
310 }
311 
312 bool MOOSGateway_OnConnect(void* pParam)
313 {
314  if (pParam)
315  {
317 
318  if (pApp->moos_resubscribe_required_)
319  {
320  for (std::set<std::string>::const_iterator it = pApp->subscribed_vars_.begin(),
321  end = pApp->subscribed_vars_.end();
322  it != end; ++it)
323  {
324  pApp->moos_client_.Register(*it, 0);
325  glog.is(DEBUG2) && glog << "Resubscribing for: " << *it << std::endl;
326  }
327  }
328 
329  return true;
330  }
331  else
332  {
333  return false;
334  }
335 }
336 
337 bool MOOSGateway_OnDisconnect(void* pParam)
338 {
339  if (pParam)
340  {
342  pApp->moos_resubscribe_required_ = true;
343 
344  return true;
345  }
346  else
347  {
348  return false;
349  }
350 }
Helpers for MOOS applications for serializing and parsed Google Protocol buffers messages.
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages...
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.