67 const std::string& tool_name)
70 std::string type_scheme_str = cfg.type();
74 std::string::size_type slash_pos = type_scheme_str.find(
'/');
75 if (slash_pos == std::string::npos)
78 if (type_scheme_str ==
"JSON")
82 else if (type_scheme_str.find(
"protobuf.") != std::string::npos)
85 type = type_scheme_str;
92 type = type_scheme_str.substr(slash_pos + 1);
102 auto pb_msg = dccl::DynamicProtobufManager::new_protobuf_message<
103 std::shared_ptr<google::protobuf::Message>>(type);
104 google::protobuf::TextFormat::Parser parser;
105 goby::util::FlexOStreamErrorCollector error_collector(cfg.value());
106 parser.RecordErrorsTo(&error_collector);
107 parser.AllowPartialMessage(
false);
108 parser.ParseFromString(cfg.value(), pb_msg.get());
125 auto j = nlohmann::json::parse(cfg.value());
126 if (type.empty() || type ==
"nlohmann::json")
128 interprocess.template publish_dynamic<nlohmann::json>(j,
group);
161 InterprocessType& interprocess,
const SubscribeConfig& cfg,
162 std::map<
int, std::unique_ptr<goby::middleware::log::LogPlugin>>& plugins,
163 const std::string& internal_group_regex =
"")
167 if (cfg.has_scheme())
174 std::make_unique<goby::middleware::log::ProtobufPlugin>();
176 std::make_unique<goby::middleware::log::DCCLPlugin>();
178 std::make_unique<goby::middleware::log::JSONPlugin>();
180 interprocess.subscribe_regex(
181 [&cfg, &plugins, internal_group_regex](
const std::vector<unsigned char>& bytes,
int scheme,
182 const std::string& type,
185 if (!internal_group_regex.empty())
187 std::regex exclude_pattern(internal_group_regex);
188 if (std::regex_match(std::string(group), exclude_pattern) &&
189 !cfg.include_internal_groups())
194 std::string debug_text;
196 auto plugin = plugins.find(log_entry.scheme());
197 if (plugin == plugins.end())
199 debug_text = std::string(
"Message of " + std::to_string(bytes.size()) +
" bytes");
205 debug_text = plugin->second->debug_text_message(log_entry);
209 debug_text =
"Unable to parse message of " +
210 std::to_string(log_entry.data().size()) +
211 " bytes. Reason: " + e.what();
216 std::cout <<
scheme <<
" | " <<
group <<
" | " << type <<
" | "
217 << goby::time::convert<boost::posix_time::ptime>(log_entry.timestamp())
218 <<
" | " << debug_text << std::endl;
220 schemes, cfg.type_regex(), cfg.group_regex());