23 #include "protobuf_node.h" 25 #include "goby/common/logger.h" 31 void goby::pb::ProtobufNode::inbox(common::MarshallingScheme marshalling_scheme,
32 const std::string& identifier,
const std::string& body,
35 if (marshalling_scheme == common::MARSHALLING_PROTOBUF)
37 std::string::size_type first_slash = identifier.find(
"/");
38 std::string group = identifier.substr(0, first_slash);
39 std::string pb_full_name = identifier.substr(first_slash + 1);
40 pb_full_name.erase(pb_full_name.size() - 1);
42 glog.is(DEBUG3) &&
glog <<
"MARSHALLING_PROTOBUF type: [" << pb_full_name <<
"], group: [" 43 << group <<
"]" << std::endl;
45 protobuf_inbox(pb_full_name, body, socket_id, group);
50 const std::string& group)
52 if (!msg.IsInitialized())
54 glog.is(DEBUG1) &&
glog << warn <<
"Cannot send message of type [" 55 << msg.GetDescriptor()->full_name()
56 <<
"] because not all required fields are set." << std::endl;
61 msg.SerializeToString(&body);
63 std::string identifier = group +
"/" + msg.GetDescriptor()->full_name() +
"/";
65 zeromq_service()->send(common::MARSHALLING_PROTOBUF, identifier, body, socket_id);
68 void goby::pb::ProtobufNode::subscribe(
const std::string& protobuf_type_name,
int socket_id,
69 const std::string& group)
71 subscribe(group +
"/" + protobuf_type_name + (protobuf_type_name.empty() ?
"" :
"/"),
75 void goby::pb::ProtobufNode::subscribe(
const std::string& identifier,
int socket_id)
77 glog.is(DEBUG1) &&
glog <<
"Subscribing for MARSHALLING_PROTOBUF type: " << identifier
79 zeromq_service()->subscribe(common::MARSHALLING_PROTOBUF, identifier, socket_id);
82 void goby::pb::StaticProtobufNode::protobuf_inbox(
const std::string& protobuf_type_name,
83 const std::string& body,
int socket_id,
84 const std::string& group)
86 typedef boost::unordered_multimap<std::string, boost::shared_ptr<SubscriptionBase> >::iterator
89 std::pair<It, It> it_range = subscriptions_.equal_range(protobuf_type_name);
91 for (It it = it_range.first; it != it_range.second; ++it)
93 const std::string& current_group = it->second->group();
94 if (current_group.empty() || current_group == group)
95 it->second->post(body);
99 void goby::pb::DynamicProtobufNode::protobuf_inbox(
const std::string& protobuf_type_name,
100 const std::string& body,
int socket_id,
101 const std::string& group)
105 boost::shared_ptr<google::protobuf::Message> msg =
106 goby::util::DynamicProtobufManager::new_protobuf_message(protobuf_type_name);
107 msg->ParseFromString(body);
109 boost::unordered_multimap<
111 boost::function<void(boost::shared_ptr<google::protobuf::Message> msg)> >::iterator it =
112 subscriptions_.find(group);
114 if (it != subscriptions_.end())
117 catch (std::exception& e)
119 glog.is(WARN) &&
glog << e.what() << std::endl;
123 void goby::pb::DynamicProtobufNode::on_receipt(
124 int socket_id, boost::function<
void(boost::shared_ptr<google::protobuf::Message> msg)> handler,
125 const std::string& group)
128 glog.is(goby::common::logger::DEBUG1) &&
129 glog <<
"registering on_receipt handler for group: " << group << std::endl;
131 subscriptions_.insert(std::make_pair(group, handler));
134 void goby::pb::DynamicProtobufNode::subscribe(
135 int socket_id, boost::function<
void(boost::shared_ptr<google::protobuf::Message> msg)> handler,
136 const std::string& group)
138 on_receipt(socket_id, handler, group);
139 ProtobufNode::subscribe(
"", socket_id, group);
common::FlexOstream glog
Access the Goby logger through this object.