Goby3 3.4.0
2026.04.13
Loading...
Searching...
No Matches
publish_subscribe_tool.h
Go to the documentation of this file.
1// Copyright 2026:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_TOOL_PUBLISH_SUBSCRIBE_TOOL_H
25#define GOBY_MIDDLEWARE_TOOL_PUBLISH_SUBSCRIBE_TOOL_H
26
27#include <map>
28#include <memory>
29#include <regex>
30#include <set>
31#include <string>
32#include <vector>
33
34#include <google/protobuf/text_format.h>
35
36#include "dccl/dynamic_protobuf_manager.h"
37
46#include "goby/time/convert.h"
49
50namespace goby
51{
52namespace middleware
53{
54namespace tool
55{
56
65template <typename InterprocessType, typename PublishConfig>
66void publish_tool_impl(InterprocessType& interprocess, const PublishConfig& cfg,
67 const std::string& tool_name)
68{
69 using goby::glog;
70 std::string type_scheme_str = cfg.type();
71 std::string type;
72 int scheme{0};
73
74 std::string::size_type slash_pos = type_scheme_str.find('/');
75 if (slash_pos == std::string::npos)
76 {
77 // special cases
78 if (type_scheme_str == "JSON")
79 {
81 }
82 else if (type_scheme_str.find("protobuf.") != std::string::npos)
83 {
85 type = type_scheme_str;
86 }
87 }
88 else
89 {
90 scheme =
91 goby::middleware::MarshallingScheme::from_string(type_scheme_str.substr(0, slash_pos));
92 type = type_scheme_str.substr(slash_pos + 1);
93 }
94
96 switch (scheme)
97 {
100 {
101 // use TextFormat
102 auto pb_msg = dccl::DynamicProtobufManager::new_protobuf_message<
103 std::shared_ptr<google::protobuf::Message>>(type);
104 google::protobuf::TextFormat::Parser parser;
105 goby::util::FlexOStreamErrorCollector error_collector(cfg.value());
106 parser.RecordErrorsTo(&error_collector);
107 parser.AllowPartialMessage(false);
108 parser.ParseFromString(cfg.value(), pb_msg.get());
109
111 interprocess
112 .template publish_dynamic<google::protobuf::Message,
114 group);
116 interprocess
117 .template publish_dynamic<google::protobuf::Message,
119 group);
120 break;
121 }
122
124 {
125 auto j = nlohmann::json::parse(cfg.value());
126 if (type.empty() || type == "nlohmann::json")
127 {
128 interprocess.template publish_dynamic<nlohmann::json>(j, group);
129 }
130 else
131 {
132 // allow for specialized types, e.g. goby_json_type = "NavigationReport";
133 std::vector<char> bytes = goby::middleware::SerializerParserHelper<
134 nlohmann::json, goby::middleware::MarshallingScheme::JSON>::serialize(j);
135
136 interprocess.publish_serialized(type, goby::middleware::MarshallingScheme::JSON,
137 bytes, group);
138 }
139
140 break;
141 }
142
143 default:
144 glog.is_die() && glog << "Scheme " << scheme << " is not implemented for '" << tool_name
145 << "'" << std::endl;
146 }
147}
148
159template <typename InterprocessType, typename SubscribeConfig>
161 InterprocessType& interprocess, const SubscribeConfig& cfg,
162 std::map<int, std::unique_ptr<goby::middleware::log::LogPlugin>>& plugins,
163 const std::string& internal_group_regex = "")
164{
166
167 if (cfg.has_scheme())
168 {
170 schemes = {scheme};
171 }
172
174 std::make_unique<goby::middleware::log::ProtobufPlugin>();
176 std::make_unique<goby::middleware::log::DCCLPlugin>();
178 std::make_unique<goby::middleware::log::JSONPlugin>();
179
180 interprocess.subscribe_regex(
181 [&cfg, &plugins, internal_group_regex](const std::vector<unsigned char>& bytes, int scheme,
182 const std::string& type,
184 {
185 if (!internal_group_regex.empty())
186 {
187 std::regex exclude_pattern(internal_group_regex);
188 if (std::regex_match(std::string(group), exclude_pattern) &&
189 !cfg.include_internal_groups())
190 return;
191 }
192
193 goby::middleware::log::LogEntry log_entry(bytes, scheme, type, group);
194 std::string debug_text;
195
196 auto plugin = plugins.find(log_entry.scheme());
197 if (plugin == plugins.end())
198 {
199 debug_text = std::string("Message of " + std::to_string(bytes.size()) + " bytes");
200 }
201 else
202 {
203 try
204 {
205 debug_text = plugin->second->debug_text_message(log_entry);
206 }
208 {
209 debug_text = "Unable to parse message of " +
210 std::to_string(log_entry.data().size()) +
211 " bytes. Reason: " + e.what();
212 }
213 }
214
215 // use similar format to goby_log_tool DEBUG_TEXT
216 std::cout << scheme << " | " << group << " | " << type << " | "
217 << goby::time::convert<boost::posix_time::ptime>(log_entry.timestamp())
218 << " | " << debug_text << std::endl;
219 },
220 schemes, cfg.type_regex(), cfg.group_regex());
221}
222
223} // namespace tool
224} // namespace middleware
225} // namespace goby
226
227#endif
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition group.h:120
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
goby::util::logger::GroupSetter group(std::string n)
void subscribe_tool_impl(InterprocessType &interprocess, const SubscribeConfig &cfg, std::map< int, std::unique_ptr< goby::middleware::log::LogPlugin > > &plugins, const std::string &internal_group_regex="")
Set up subscriptions for the interprocess subscribe tool.
void publish_tool_impl(InterprocessType &interprocess, const PublishConfig &cfg, const std::string &tool_name)
Publish a single message on the interprocess layer using the provided publish config.
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
static int from_string(const std::string &s)
Convert from a string to a marshalling scheme id.
Definition interface.h:77
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98