Goby3 3.2.3
2025.05.13
Loading...
Searching...
No Matches
protobuf_log_plugin.h
Go to the documentation of this file.
1// Copyright 2019-2025:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_MIDDLEWARE_LOG_PROTOBUF_LOG_PLUGIN_H
25#define GOBY_MIDDLEWARE_LOG_PROTOBUF_LOG_PLUGIN_H
26
27#include <google/protobuf/util/json_util.h>
28
29#include "goby/middleware/log.h"
32#include "goby/time/convert.h"
34#include "log_plugin.h"
35
36#if GOOGLE_PROTOBUF_VERSION < 3001000
37#define ByteSizeLong ByteSize
38#endif
39
40namespace goby
41{
42namespace middleware
43{
44namespace log
45{
46constexpr goby::middleware::Group file_desc_group{"goby::log::ProtobufFileDescriptor"};
47
49template <int scheme> class ProtobufPluginBase : public LogPlugin
50{
53 "Scheme must be PROTOBUF or DCCL");
54
55 public:
56 ProtobufPluginBase(bool user_pool_first) : user_pool_first_(user_pool_first) {}
57
58 std::string debug_text_message(LogEntry& log_entry) override
59 {
60 auto msgs = parse_message(log_entry);
61
62 std::stringstream ss;
63 for (typename decltype(msgs)::size_type i = 0, n = msgs.size(); i < n; ++i)
64 {
65 if (n > 1)
66 ss << "[" << i << "]";
67 ss << msgs[i]->ShortDebugString();
68 }
69 return ss.str();
70 }
71
72 std::vector<goby::middleware::HDF5ProtobufEntry> hdf5_entry(LogEntry& log_entry) override
73 {
74 std::vector<goby::middleware::HDF5ProtobufEntry> hdf5_entries;
75 auto msgs = parse_message(log_entry);
76
77 for (auto msg : msgs)
78 {
79 hdf5_entries.emplace_back();
81 hdf5_entry.channel = log_entry.group();
82 hdf5_entry.time = goby::time::convert<decltype(hdf5_entry.time)>(log_entry.timestamp());
83 hdf5_entry.scheme = scheme;
84 hdf5_entry.msg = msg;
85 }
86 return hdf5_entries;
87 }
88
89 std::shared_ptr<nlohmann::json> json_message(LogEntry& log_entry) override
90 {
91 auto j = std::make_shared<nlohmann::json>();
92 auto msgs = parse_message(log_entry);
93
94 for (typename decltype(msgs)::size_type i = 0, n = msgs.size(); i < n; ++i)
95 {
96 std::string jstr;
97 google::protobuf::util::MessageToJsonString(*msgs[i], &jstr);
98
99 if (n > 1)
100 (*j)[n] = nlohmann::json::parse(jstr);
101 else
102 (*j) = nlohmann::json::parse(jstr);
103 }
104 return j;
105 }
106
107 void register_read_hooks(const std::ifstream& in_log_file) override
108 {
109 LogEntry::filter_hook[{static_cast<int>(scheme), static_cast<std::string>(file_desc_group),
110 google::protobuf::FileDescriptorProto::descriptor()->full_name()}] =
111 [&](const std::vector<unsigned char>& data)
112 {
113 google::protobuf::FileDescriptorProto file_desc_proto;
114 file_desc_proto.ParseFromArray(&data[0], data.size());
115
116 if (!read_file_desc_names_.count(file_desc_proto.name()))
117 {
118 goby::glog.is_debug1() && goby::glog << "Adding: " << file_desc_proto.name()
119 << std::endl;
120
121 dccl::DynamicProtobufManager::add_protobuf_file(file_desc_proto);
122 read_file_desc_names_.insert(file_desc_proto.name());
123 }
124 };
125 }
126
127 void register_write_hooks(std::ofstream& out_log_file) override
128 {
129 LogEntry::new_type_hook[scheme] = [&](const std::string& type)
130 { add_new_protobuf_type(type, out_log_file); };
131 }
132
133 std::vector<std::shared_ptr<google::protobuf::Message>> parse_message(LogEntry& log_entry)
134 {
135 std::vector<std::shared_ptr<google::protobuf::Message>> msgs;
136
137 const auto& data = log_entry.data();
138 auto bytes_begin = data.begin(), bytes_end = data.end(), actual_end = data.begin();
139
140 while (actual_end != bytes_end)
141 {
142 try
143 {
145 bytes_begin, bytes_end, actual_end, log_entry.type(), user_pool_first_);
146
147 find_unknown_fields(*msg);
148 msgs.push_back(msg);
149 }
150 catch (std::exception& e)
151 {
152 throw(log::LogException("Failed to create Protobuf message of type: " +
153 log_entry.type() + ", reason: " + e.what()));
154 }
155 bytes_begin = actual_end;
156 }
157
158 return msgs;
159 }
160
161 private:
162 void insert_protobuf_file_desc(const google::protobuf::FileDescriptor* file_desc,
163 std::ofstream& out_log_file)
164 {
165 if (written_file_desc_.count(file_desc) == 0)
166 {
167 for (int i = 0, n = file_desc->dependency_count(); i < n; ++i)
168 insert_protobuf_file_desc(file_desc->dependency(i), out_log_file);
169
171 goby::glog << "Inserting file descriptor proto for: " << file_desc->name() << " : "
172 << file_desc << std::endl;
173
174 written_file_desc_.insert(file_desc);
175
176 google::protobuf::FileDescriptorProto file_desc_proto;
177 file_desc->CopyTo(&file_desc_proto);
178 std::vector<unsigned char> data(file_desc_proto.ByteSizeLong());
179 file_desc_proto.SerializeToArray(&data[0], data.size());
181 google::protobuf::FileDescriptorProto::descriptor()->full_name(),
183 entry.serialize(&out_log_file);
184 }
185 else
186 {
188 << "Skipping already written file descriptor proto for: "
189 << file_desc->name() << std::endl;
190 }
191 }
192
193 void add_new_protobuf_type(const std::string& protobuf_type, std::ofstream& out_log_file)
194 {
195 const google::protobuf::Descriptor* desc =
196 dccl::DynamicProtobufManager::find_descriptor(protobuf_type);
197 if (!desc)
198 {
199 goby::glog.is_warn() && goby::glog << "Unknown protobuf type: " << protobuf_type
200 << std::endl;
201 }
202 else
203 {
204 add_new_protobuf_type(desc, out_log_file);
205 }
206 }
207
208 void add_new_protobuf_type(const google::protobuf::Descriptor* desc,
209 std::ofstream& out_log_file)
210 {
211 if (written_desc_.count(desc) == 0)
212 {
213 goby::glog.is_debug1() && goby::glog << "Add new protobuf type: " << desc->full_name()
214 << std::endl;
215
216 insert_protobuf_file_desc(desc->file(), out_log_file);
217
218 auto insert_extensions =
219 [this, &out_log_file](
220 const std::vector<const google::protobuf::FieldDescriptor*> extensions)
221 {
222 for (const auto* field_desc : extensions)
223 {
224 insert_protobuf_file_desc(field_desc->file(), out_log_file);
225 }
226 };
227
228 std::vector<const google::protobuf::FieldDescriptor*> generated_extensions;
229 google::protobuf::DescriptorPool::generated_pool()->FindAllExtensions(
230 desc, &generated_extensions);
231
232 for (const auto* desc : generated_extensions)
233 goby::glog.is_debug1() && goby::glog << "Found generated extension ["
234 << desc->number() << "]: " << desc->full_name()
235 << " in file: " << desc->file()->name()
236 << std::endl;
237
238 insert_extensions(generated_extensions);
239
240 std::vector<const google::protobuf::FieldDescriptor*> user_extensions;
241
242#ifdef DCCL_VERSION_4_1_OR_NEWER
243 dccl::DynamicProtobufManager::user_descriptor_pool_call(
244 &google::protobuf::DescriptorPool::FindAllExtensions, desc, &user_extensions);
245#else
246 dccl::DynamicProtobufManager::user_descriptor_pool().FindAllExtensions(
247 desc, &user_extensions);
248#endif
249
250 for (const auto* desc : user_extensions)
251 goby::glog.is_debug1() && goby::glog << "Found user extension [" << desc->number()
252 << "]: " << desc->full_name()
253 << " in file: " << desc->file()->name()
254 << std::endl;
255 insert_extensions(user_extensions);
256
257 written_desc_.insert(desc);
258
259 // recursively add children
260 for (auto i = 0, n = desc->field_count(); i < n; ++i)
261 {
262 const auto* field_desc = desc->field(i);
263 if (field_desc->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE)
264 add_new_protobuf_type(field_desc->message_type(), out_log_file);
265 }
266 }
267 }
268
269 void find_unknown_fields(const google::protobuf::Message& msg)
270 {
271 const auto* refl = msg.GetReflection();
272 const auto* desc = msg.GetDescriptor();
273
274 const google::protobuf::UnknownFieldSet& unknown_fields = refl->GetUnknownFields(msg);
275 if (!unknown_fields.empty())
276 {
277 std::string unknown_field_numbers;
278 for (int i = 0, n = unknown_fields.field_count(); i < n; ++i)
279 {
280 const auto& unknown_field = unknown_fields.field(i);
281 unknown_field_numbers += std::to_string(unknown_field.number()) + " ";
282 }
283
285 goby::glog << "Unknown fields found in " << desc->full_name() << ": "
286 << unknown_field_numbers
287 << ", ensure all extensions are loaded using load_shared_library"
288 << std::endl;
289 }
290 for (auto i = 0, n = desc->field_count(); i < n; ++i)
291 {
292 const auto* field_desc = desc->field(i);
293
294 if (field_desc->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE)
295 {
296 if (field_desc->is_repeated())
297 {
298 for (int i = 0, n = refl->FieldSize(msg, field_desc); i < n; ++i)
299 {
300 find_unknown_fields(refl->GetRepeatedMessage(msg, field_desc, i));
301 }
302 }
303 else
304 {
305 find_unknown_fields(refl->GetMessage(msg, field_desc));
306 }
307 }
308 }
309 }
310
311 private:
312 std::set<const google::protobuf::FileDescriptor*> written_file_desc_;
313 std::set<const google::protobuf::Descriptor*> written_desc_;
314 std::set<std::string> read_file_desc_names_;
315 bool user_pool_first_;
316};
317
318class ProtobufPlugin : public ProtobufPluginBase<goby::middleware::MarshallingScheme::PROTOBUF>
319{
320 public:
321 ProtobufPlugin(bool user_pool_first = false)
322 : ProtobufPluginBase<goby::middleware::MarshallingScheme::PROTOBUF>(user_pool_first)
323 {
324 }
325};
326
327} // namespace log
328} // namespace middleware
329} // namespace goby
330
331#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
const goby::time::SystemClock::time_point & timestamp() const
Definition log_entry.h:152
const std::string & type() const
Definition log_entry.h:150
const Group & group() const
Definition log_entry.h:151
static std::map< int, std::function< void(const std::string &type)> > new_type_hook
Definition log_entry.h:119
static std::map< LogFilter, std::function< void(const std::vector< unsigned char > &data)> > filter_hook
Definition log_entry.h:123
const std::vector< unsigned char > & data() const
Definition log_entry.h:148
Implements hooks for Protobuf metadata.
void register_read_hooks(const std::ifstream &in_log_file) override
std::vector< std::shared_ptr< google::protobuf::Message > > parse_message(LogEntry &log_entry)
void register_write_hooks(std::ofstream &out_log_file) override
std::vector< goby::middleware::HDF5ProtobufEntry > hdf5_entry(LogEntry &log_entry) override
std::string debug_text_message(LogEntry &log_entry) override
std::shared_ptr< nlohmann::json > json_message(LogEntry &log_entry) override
ProtobufPlugin(bool user_pool_first=false)
constexpr goby::middleware::Group file_desc_group
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
ToTimeType convert(FromTimeType from_time)
Convert between time representations (this function works for tautological conversions)
Definition convert.h:49
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
STL namespace.
Represents an entry in a HDF5 scientific data file converted from a Google Protocol Buffers message.
Definition hdf5_plugin.h:42
std::string channel
Channel (or Group) name.
Definition hdf5_plugin.h:44
Enumeration and helper functions for marshalling scheme identification.
Definition interface.h:46
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition interface.h:129