Goby v2
liaison.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include <boost/filesystem.hpp>
23 
24 #include <Wt/WAnchor>
25 #include <Wt/WHBoxLayout>
26 #include <Wt/WImage>
27 #include <Wt/WStackedWidget>
28 #include <Wt/WText>
29 #include <Wt/WVBoxLayout>
30 
31 #include "goby/common/time.h"
32 #include "goby/util/dynamic_protobuf_manager.h"
33 
34 #include "liaison.h"
35 #include "liaison_wt_thread.h"
36 
37 using goby::glog;
38 using namespace Wt;
39 using namespace goby::common::logger;
40 
41 boost::shared_ptr<zmq::context_t> goby::common::Liaison::zmq_context_(new zmq::context_t(1));
42 std::vector<void*> goby::common::Liaison::plugin_handles_;
43 
44 goby::common::protobuf::LiaisonConfig goby::common::liaison_cfg_;
45 
46 int main(int argc, char* argv[])
47 {
48  glog.set_lock_action(goby::common::logger_lock::lock);
49 
50  // load plugins from environmental variable GOBY_LIAISON_PLUGINS
51  char* plugins = getenv("GOBY_LIAISON_PLUGINS");
52  if (plugins)
53  {
54  std::string s_plugins(plugins);
55  std::vector<std::string> plugin_vec;
56  boost::split(plugin_vec, s_plugins, boost::is_any_of(";:,"));
57 
58  for (int i = 0, n = plugin_vec.size(); i < n; ++i)
59  {
60  glog.is(VERBOSE) && glog << "Loading liaison plugin library: " << plugin_vec[i]
61  << std::endl;
62  void* handle = dlopen(plugin_vec[i].c_str(), RTLD_LAZY);
63  if (handle)
64  goby::common::Liaison::plugin_handles_.push_back(handle);
65  else
66  glog.is(DIE) && glog << "Failed to open library: " << plugin_vec[i] << std::endl;
67  }
68  }
69 
70  int return_value = goby::run<goby::common::Liaison>(argc, argv, &goby::common::liaison_cfg_);
71  goby::util::DynamicProtobufManager::protobuf_shutdown();
72 
73  for (int i = 0, n = goby::common::Liaison::plugin_handles_.size(); i < n; ++i)
74  dlclose(goby::common::Liaison::plugin_handles_[i]);
75 
76  return return_value;
77 }
78 
79 goby::common::Liaison::Liaison(protobuf::LiaisonConfig* cfg)
80  : ZeroMQApplicationBase(&zeromq_service_, cfg), zeromq_service_(zmq_context_),
81  pubsub_node_(&zeromq_service_, cfg->base().pubsub_config())
82 {
83  // load all shared libraries
84  for (int i = 0, n = cfg->load_shared_library_size(); i < n; ++i)
85  {
86  glog.is(VERBOSE) && glog << "Loading shared library: " << cfg->load_shared_library(i)
87  << std::endl;
88 
89  void* handle =
90  goby::util::DynamicProtobufManager::load_from_shared_lib(cfg->load_shared_library(i));
91 
92  if (!handle)
93  {
94  glog.is(DIE) && glog << "Failed ... check path provided or add to /etc/ld.so.conf "
95  << "or LD_LIBRARY_PATH" << std::endl;
96  }
97  }
98 
99  // load all .proto files
100  goby::util::DynamicProtobufManager::enable_compilation();
101  for (int i = 0, n = cfg->load_proto_file_size(); i < n; ++i)
102  load_proto_file(cfg->load_proto_file(i));
103 
104  // load all .proto file directories
105  for (int i = 0, n = cfg->load_proto_dir_size(); i < n; ++i)
106  {
107  boost::filesystem::path current_dir(cfg->load_proto_dir(i));
108 
109  for (boost::filesystem::directory_iterator iter(current_dir), end; iter != end; ++iter)
110  {
111 #if BOOST_FILESYSTEM_VERSION == 3
112  if (iter->path().extension().string() == ".proto")
113 #else
114  if (iter->path().extension() == ".proto")
115 #endif
116 
117  load_proto_file(iter->path().string());
118  }
119  }
120 
121  pubsub_node_.subscribe_all();
122  zeromq_service_.connect_inbox_slot(&Liaison::inbox, this);
123 
124  protobuf::ZeroMQServiceConfig ipc_sockets;
125  protobuf::ZeroMQServiceConfig::Socket* internal_publish_socket = ipc_sockets.add_socket();
126  internal_publish_socket->set_socket_type(protobuf::ZeroMQServiceConfig::Socket::PUBLISH);
127  internal_publish_socket->set_socket_id(LIAISON_INTERNAL_PUBLISH_SOCKET);
128  internal_publish_socket->set_transport(protobuf::ZeroMQServiceConfig::Socket::INPROC);
129  internal_publish_socket->set_connect_or_bind(protobuf::ZeroMQServiceConfig::Socket::BIND);
130  internal_publish_socket->set_socket_name(liaison_internal_publish_socket_name());
131 
132  protobuf::ZeroMQServiceConfig::Socket* internal_subscribe_socket = ipc_sockets.add_socket();
133  internal_subscribe_socket->set_socket_type(protobuf::ZeroMQServiceConfig::Socket::SUBSCRIBE);
134  internal_subscribe_socket->set_socket_id(LIAISON_INTERNAL_SUBSCRIBE_SOCKET);
135  internal_subscribe_socket->set_transport(protobuf::ZeroMQServiceConfig::Socket::INPROC);
136  internal_subscribe_socket->set_connect_or_bind(protobuf::ZeroMQServiceConfig::Socket::BIND);
137  internal_subscribe_socket->set_socket_name(liaison_internal_subscribe_socket_name());
138 
139  zeromq_service_.merge_cfg(ipc_sockets);
140  zeromq_service_.subscribe_all(LIAISON_INTERNAL_SUBSCRIBE_SOCKET);
141 
142  try
143  {
144  std::string doc_root;
145 
146  if (cfg->has_docroot())
147  doc_root = cfg->docroot();
148  else if (boost::filesystem::exists(boost::filesystem::path(GOBY_LIAISON_COMPILED_DOCROOT)))
149  doc_root = GOBY_LIAISON_COMPILED_DOCROOT;
150  else if (boost::filesystem::exists(boost::filesystem::path(GOBY_LIAISON_INSTALLED_DOCROOT)))
151  doc_root = GOBY_LIAISON_INSTALLED_DOCROOT;
152  else
153  throw(std::runtime_error("No valid docroot found for Goby Liaison. Set docroot to the "
154  "valid path to what is normally /usr/share/goby/liaison"));
155 
156  // create a set of fake argc / argv for Wt::WServer
157  std::vector<std::string> wt_argv_vec;
158  std::string str = cfg->base().app_name() + " --docroot " + doc_root + " --http-port " +
159  goby::util::as<std::string>(cfg->http_port()) + " --http-address " +
160  cfg->http_address() + " " + cfg->additional_wt_http_params();
161  boost::split(wt_argv_vec, str, boost::is_any_of(" "));
162 
163  char* wt_argv[wt_argv_vec.size()];
164 
165  glog.is(DEBUG1) && glog << "setting Wt cfg to: " << std::flush;
166  for (int i = 0, n = wt_argv_vec.size(); i < n; ++i)
167  {
168  wt_argv[i] = new char[wt_argv_vec[i].size() + 1];
169  strcpy(wt_argv[i], wt_argv_vec[i].c_str());
170  glog.is(DEBUG1) && glog << "\t" << wt_argv[i] << std::endl;
171  }
172 
173  wt_server_.setServerConfiguration(wt_argv_vec.size(), wt_argv);
174 
175  // delete our fake argv
176  for (int i = 0, n = wt_argv_vec.size(); i < n; ++i) delete[] wt_argv[i];
177 
178  wt_server_.addEntryPoint(Wt::Application, goby::common::create_wt_application);
179 
180  if (!wt_server_.start())
181  {
182  glog.is(DIE) && glog << "Could not start Wt HTTP server." << std::endl;
183  }
184  }
185  catch (Wt::WServer::Exception& e)
186  {
187  glog.is(DIE) && glog << "Could not start Wt HTTP server. Exception: " << e.what()
188  << std::endl;
189  }
190 }
191 
192 void goby::common::Liaison::load_proto_file(const std::string& path)
193 {
194 #if BOOST_FILESYSTEM_VERSION == 3
195  boost::filesystem::path bpath = boost::filesystem::absolute(path);
196 #else
197  boost::filesystem::path bpath = boost::filesystem::complete(path);
198 #endif
199  bpath.normalize();
200 
201  glog.is(VERBOSE) && glog << "Loading protobuf file: " << bpath << std::endl;
202 
203  if (!goby::util::DynamicProtobufManager::user_descriptor_pool().FindFileByName(bpath.string()))
204  glog.is(DIE) && glog << "Failed to load file." << std::endl;
205 }
206 
207 void goby::common::Liaison::loop()
208 {
209  // static int i = 0;
210  // i++;
211  // if(i > (20 * cfg_.base().loop_freq()))
212  // {
213  // wt_server_.stop();
214  // quit();
215  // }
216 }
217 
218 void goby::common::Liaison::inbox(MarshallingScheme marshalling_scheme,
219  const std::string& identifier, const std::string& data,
220  int socket_id)
221 {
222  glog.is(DEBUG2) && glog << "Liaison: got message with identifier: " << identifier
223  << " from socket: " << socket_id << std::endl;
224  zeromq_service_.send(marshalling_scheme, identifier, data, LIAISON_INTERNAL_PUBLISH_SOCKET);
225 
226  if (socket_id == LIAISON_INTERNAL_SUBSCRIBE_SOCKET)
227  {
228  glog.is(DEBUG2) && glog << "Sending to pubsub node: " << identifier << std::endl;
229  pubsub_node_.publish(marshalling_scheme, identifier, data);
230  }
231 }
common::FlexOstream glog
Access the Goby logger through this object.