Goby v2
protobuf_node.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 //
6 //
7 // This file is part of the Goby Underwater Autonomy Project Libraries
8 // ("The Goby Libraries").
9 //
10 // The Goby Libraries are free software: you can redistribute them and/or modify
11 // them under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 2.1 of the License, or
13 // (at your option) any later version.
14 //
15 // The Goby Libraries are distributed in the hope that they will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
22 
23 #include "protobuf_node.h"
24 
25 #include "goby/common/logger.h"
26 
27 using goby::glog;
28 using goby::util::as;
29 using namespace goby::common::logger;
30 
31 void goby::pb::ProtobufNode::inbox(common::MarshallingScheme marshalling_scheme,
32  const std::string& identifier, const std::string& body,
33  int socket_id)
34 {
35  if (marshalling_scheme == common::MARSHALLING_PROTOBUF)
36  {
37  std::string::size_type first_slash = identifier.find("/");
38  std::string group = identifier.substr(0, first_slash);
39  std::string pb_full_name = identifier.substr(first_slash + 1);
40  pb_full_name.erase(pb_full_name.size() - 1); // final slash
41 
42  glog.is(DEBUG3) && glog << "MARSHALLING_PROTOBUF type: [" << pb_full_name << "], group: ["
43  << group << "]" << std::endl;
44 
45  protobuf_inbox(pb_full_name, body, socket_id, group);
46  }
47 }
48 
49 void goby::pb::ProtobufNode::send(const google::protobuf::Message& msg, int socket_id,
50  const std::string& group)
51 {
52  if (!msg.IsInitialized())
53  {
54  glog.is(DEBUG1) && glog << warn << "Cannot send message of type ["
55  << msg.GetDescriptor()->full_name()
56  << "] because not all required fields are set." << std::endl;
57  return;
58  }
59 
60  std::string body;
61  msg.SerializeToString(&body);
62 
63  std::string identifier = group + "/" + msg.GetDescriptor()->full_name() + "/";
64 
65  zeromq_service()->send(common::MARSHALLING_PROTOBUF, identifier, body, socket_id);
66 }
67 
68 void goby::pb::ProtobufNode::subscribe(const std::string& protobuf_type_name, int socket_id,
69  const std::string& group)
70 {
71  subscribe(group + "/" + protobuf_type_name + (protobuf_type_name.empty() ? "" : "/"),
72  socket_id);
73 }
74 
75 void goby::pb::ProtobufNode::subscribe(const std::string& identifier, int socket_id)
76 {
77  glog.is(DEBUG1) && glog << "Subscribing for MARSHALLING_PROTOBUF type: " << identifier
78  << std::endl;
79  zeromq_service()->subscribe(common::MARSHALLING_PROTOBUF, identifier, socket_id);
80 }
81 
82 void goby::pb::StaticProtobufNode::protobuf_inbox(const std::string& protobuf_type_name,
83  const std::string& body, int socket_id,
84  const std::string& group)
85 {
86  typedef boost::unordered_multimap<std::string, boost::shared_ptr<SubscriptionBase> >::iterator
87  It;
88 
89  std::pair<It, It> it_range = subscriptions_.equal_range(protobuf_type_name);
90 
91  for (It it = it_range.first; it != it_range.second; ++it)
92  {
93  const std::string& current_group = it->second->group();
94  if (current_group.empty() || current_group == group)
95  it->second->post(body);
96  }
97 }
98 
99 void goby::pb::DynamicProtobufNode::protobuf_inbox(const std::string& protobuf_type_name,
100  const std::string& body, int socket_id,
101  const std::string& group)
102 {
103  try
104  {
105  boost::shared_ptr<google::protobuf::Message> msg =
106  goby::util::DynamicProtobufManager::new_protobuf_message(protobuf_type_name);
107  msg->ParseFromString(body);
108 
109  boost::unordered_multimap<
110  std::string,
111  boost::function<void(boost::shared_ptr<google::protobuf::Message> msg)> >::iterator it =
112  subscriptions_.find(group);
113 
114  if (it != subscriptions_.end())
115  it->second(msg);
116  }
117  catch (std::exception& e)
118  {
119  glog.is(WARN) && glog << e.what() << std::endl;
120  }
121 }
122 
123 void goby::pb::DynamicProtobufNode::on_receipt(
124  int socket_id, boost::function<void(boost::shared_ptr<google::protobuf::Message> msg)> handler,
125  const std::string& group)
126 {
127  using goby::glog;
128  glog.is(goby::common::logger::DEBUG1) &&
129  glog << "registering on_receipt handler for group: " << group << std::endl;
130 
131  subscriptions_.insert(std::make_pair(group, handler));
132 }
133 
134 void goby::pb::DynamicProtobufNode::subscribe(
135  int socket_id, boost::function<void(boost::shared_ptr<google::protobuf::Message> msg)> handler,
136  const std::string& group)
137 {
138  on_receipt(socket_id, handler, group);
139  ProtobufNode::subscribe("", socket_id, group);
140 }
common::FlexOstream glog
Access the Goby logger through this object.