Goby3  3.1.4
2024.02.22
protobuf_log_plugin.h
Go to the documentation of this file.
1 // Copyright 2019-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_LOG_PROTOBUF_LOG_PLUGIN_H
25 #define GOBY_MIDDLEWARE_LOG_PROTOBUF_LOG_PLUGIN_H
26 
27 #include <google/protobuf/util/json_util.h>
28 
29 #include "goby/middleware/log.h"
32 #include "goby/time/convert.h"
33 #include "goby/util/dccl_compat.h"
34 #include "log_plugin.h"
35 
36 #if GOOGLE_PROTOBUF_VERSION < 3001000
37 #define ByteSizeLong ByteSize
38 #endif
39 
40 namespace goby
41 {
42 namespace middleware
43 {
44 namespace log
45 {
46 constexpr goby::middleware::Group file_desc_group{"goby::log::ProtobufFileDescriptor"};
47 
49 template <int scheme> class ProtobufPluginBase : public LogPlugin
50 {
53  "Scheme must be PROTOBUF or DCCL");
54 
55  public:
56  ProtobufPluginBase(bool user_pool_first) : user_pool_first_(user_pool_first) {}
57 
58  std::string debug_text_message(LogEntry& log_entry) override
59  {
60  auto msgs = parse_message(log_entry);
61 
62  std::stringstream ss;
63  for (typename decltype(msgs)::size_type i = 0, n = msgs.size(); i < n; ++i)
64  {
65  if (n > 1)
66  ss << "[" << i << "]";
67  ss << msgs[i]->ShortDebugString();
68  }
69  return ss.str();
70  }
71 
72  std::vector<goby::middleware::HDF5ProtobufEntry> hdf5_entry(LogEntry& log_entry) override
73  {
74  std::vector<goby::middleware::HDF5ProtobufEntry> hdf5_entries;
75  auto msgs = parse_message(log_entry);
76 
77  for (auto msg : msgs)
78  {
79  hdf5_entries.emplace_back();
80  goby::middleware::HDF5ProtobufEntry& hdf5_entry = hdf5_entries.back();
81  hdf5_entry.channel = log_entry.group();
82  hdf5_entry.time = goby::time::convert<decltype(hdf5_entry.time)>(log_entry.timestamp());
83  hdf5_entry.scheme = scheme;
84  hdf5_entry.msg = msg;
85  }
86  return hdf5_entries;
87  }
88 
89  std::shared_ptr<nlohmann::json> json_message(LogEntry& log_entry) override
90  {
91  auto j = std::make_shared<nlohmann::json>();
92  auto msgs = parse_message(log_entry);
93 
94  for (typename decltype(msgs)::size_type i = 0, n = msgs.size(); i < n; ++i)
95  {
96  std::string jstr;
97  google::protobuf::util::MessageToJsonString(*msgs[i], &jstr);
98 
99  if (n > 1)
100  (*j)[n] = nlohmann::json::parse(jstr);
101  else
102  (*j) = nlohmann::json::parse(jstr);
103  }
104  return j;
105  }
106 
107  void register_read_hooks(const std::ifstream& in_log_file) override
108  {
109  LogEntry::filter_hook[{static_cast<int>(scheme), static_cast<std::string>(file_desc_group),
110  google::protobuf::FileDescriptorProto::descriptor()->full_name()}] =
111  [&](const std::vector<unsigned char>& data)
112  {
113  google::protobuf::FileDescriptorProto file_desc_proto;
114  file_desc_proto.ParseFromArray(&data[0], data.size());
115 
116  if (!read_file_desc_names_.count(file_desc_proto.name()))
117  {
118  goby::glog.is_debug1() && goby::glog << "Adding: " << file_desc_proto.name()
119  << std::endl;
120 
121  dccl::DynamicProtobufManager::add_protobuf_file(file_desc_proto);
122  read_file_desc_names_.insert(file_desc_proto.name());
123  }
124  };
125  }
126 
127  void register_write_hooks(std::ofstream& out_log_file) override
128  {
129  LogEntry::new_type_hook[scheme] = [&](const std::string& type)
130  { add_new_protobuf_type(type, out_log_file); };
131  }
132 
133  std::vector<std::shared_ptr<google::protobuf::Message>> parse_message(LogEntry& log_entry)
134  {
135  std::vector<std::shared_ptr<google::protobuf::Message>> msgs;
136 
137  const auto& data = log_entry.data();
138  auto bytes_begin = data.begin(), bytes_end = data.end(), actual_end = data.begin();
139 
140  while (actual_end != bytes_end)
141  {
142  try
143  {
145  bytes_begin, bytes_end, actual_end, log_entry.type(), user_pool_first_);
146 
147  find_unknown_fields(*msg);
148  msgs.push_back(msg);
149  }
150  catch (std::exception& e)
151  {
152  throw(log::LogException("Failed to create Protobuf message of type: " +
153  log_entry.type() + ", reason: " + e.what()));
154  }
155  bytes_begin = actual_end;
156  }
157 
158  return msgs;
159  }
160 
161  private:
162  void insert_protobuf_file_desc(const google::protobuf::FileDescriptor* file_desc,
163  std::ofstream& out_log_file)
164  {
165  if (written_file_desc_.count(file_desc) == 0)
166  {
167  for (int i = 0, n = file_desc->dependency_count(); i < n; ++i)
168  insert_protobuf_file_desc(file_desc->dependency(i), out_log_file);
169 
170  goby::glog.is_debug1() &&
171  goby::glog << "Inserting file descriptor proto for: " << file_desc->name() << " : "
172  << file_desc << std::endl;
173 
174  written_file_desc_.insert(file_desc);
175 
176  google::protobuf::FileDescriptorProto file_desc_proto;
177  file_desc->CopyTo(&file_desc_proto);
178  std::vector<unsigned char> data(file_desc_proto.ByteSizeLong());
179  file_desc_proto.SerializeToArray(&data[0], data.size());
181  google::protobuf::FileDescriptorProto::descriptor()->full_name(),
183  entry.serialize(&out_log_file);
184  }
185  else
186  {
188  << "Skipping already written file descriptor proto for: "
189  << file_desc->name() << std::endl;
190  }
191  }
192 
193  void add_new_protobuf_type(const std::string& protobuf_type, std::ofstream& out_log_file)
194  {
195  const google::protobuf::Descriptor* desc =
196  dccl::DynamicProtobufManager::find_descriptor(protobuf_type);
197  if (!desc)
198  {
199  goby::glog.is_warn() && goby::glog << "Unknown protobuf type: " << protobuf_type
200  << std::endl;
201  }
202  else
203  {
204  add_new_protobuf_type(desc, out_log_file);
205  }
206  }
207 
208  void add_new_protobuf_type(const google::protobuf::Descriptor* desc,
209  std::ofstream& out_log_file)
210  {
211  if (written_desc_.count(desc) == 0)
212  {
213  goby::glog.is_debug1() && goby::glog << "Add new protobuf type: " << desc->full_name()
214  << std::endl;
215 
216  insert_protobuf_file_desc(desc->file(), out_log_file);
217 
218  auto insert_extensions =
219  [this, &out_log_file](
220  const std::vector<const google::protobuf::FieldDescriptor*> extensions)
221  {
222  for (const auto* field_desc : extensions)
223  {
224  insert_protobuf_file_desc(field_desc->file(), out_log_file);
225  }
226  };
227 
228  std::vector<const google::protobuf::FieldDescriptor*> generated_extensions;
229  google::protobuf::DescriptorPool::generated_pool()->FindAllExtensions(
230  desc, &generated_extensions);
231 
232  for (const auto* desc : generated_extensions)
233  goby::glog.is_debug1() && goby::glog << "Found generated extension ["
234  << desc->number() << "]: " << desc->full_name()
235  << " in file: " << desc->file()->name()
236  << std::endl;
237 
238  insert_extensions(generated_extensions);
239 
240  std::vector<const google::protobuf::FieldDescriptor*> user_extensions;
241 
242 #ifdef DCCL_VERSION_4_1_OR_NEWER
243  dccl::DynamicProtobufManager::user_descriptor_pool_call(
244  &google::protobuf::DescriptorPool::FindAllExtensions, desc, &user_extensions);
245 #else
246  dccl::DynamicProtobufManager::user_descriptor_pool().FindAllExtensions(
247  desc, &user_extensions);
248 #endif
249 
250  for (const auto* desc : user_extensions)
251  goby::glog.is_debug1() && goby::glog << "Found user extension [" << desc->number()
252  << "]: " << desc->full_name()
253  << " in file: " << desc->file()->name()
254  << std::endl;
255  insert_extensions(user_extensions);
256 
257  written_desc_.insert(desc);
258 
259  // recursively add children
260  for (auto i = 0, n = desc->field_count(); i < n; ++i)
261  {
262  const auto* field_desc = desc->field(i);
263  if (field_desc->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE)
264  add_new_protobuf_type(field_desc->message_type(), out_log_file);
265  }
266  }
267  }
268 
269  void find_unknown_fields(const google::protobuf::Message& msg)
270  {
271  const auto* refl = msg.GetReflection();
272  const auto* desc = msg.GetDescriptor();
273 
274  const google::protobuf::UnknownFieldSet& unknown_fields = refl->GetUnknownFields(msg);
275  if (!unknown_fields.empty())
276  {
277  std::string unknown_field_numbers;
278  for (int i = 0, n = unknown_fields.field_count(); i < n; ++i)
279  {
280  const auto& unknown_field = unknown_fields.field(i);
281  unknown_field_numbers += std::to_string(unknown_field.number()) + " ";
282  }
283 
284  goby::glog.is_warn() &&
285  goby::glog << "Unknown fields found in " << desc->full_name() << ": "
286  << unknown_field_numbers
287  << ", ensure all extensions are loaded using load_shared_library"
288  << std::endl;
289  }
290  for (auto i = 0, n = desc->field_count(); i < n; ++i)
291  {
292  const auto* field_desc = desc->field(i);
293 
294  if (field_desc->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE)
295  {
296  if (field_desc->is_repeated())
297  {
298  for (int i = 0, n = refl->FieldSize(msg, field_desc); i < n; ++i)
299  {
300  find_unknown_fields(refl->GetRepeatedMessage(msg, field_desc, i));
301  }
302  }
303  else
304  {
305  find_unknown_fields(refl->GetMessage(msg, field_desc));
306  }
307  }
308  }
309  }
310 
311  private:
312  std::set<const google::protobuf::FileDescriptor*> written_file_desc_;
313  std::set<const google::protobuf::Descriptor*> written_desc_;
314  std::set<std::string> read_file_desc_names_;
315  bool user_pool_first_;
316 };
317 
318 class ProtobufPlugin : public ProtobufPluginBase<goby::middleware::MarshallingScheme::PROTOBUF>
319 {
320  public:
321  ProtobufPlugin(bool user_pool_first = false)
322  : ProtobufPluginBase<goby::middleware::MarshallingScheme::PROTOBUF>(user_pool_first)
323  {
324  }
325 };
326 
327 } // namespace log
328 } // namespace middleware
329 } // namespace goby
330 
331 #endif
goby::middleware::MarshallingScheme
Enumeration and helper functions for marshalling scheme identification.
Definition: interface.h:45
goby::middleware::SerializerParserHelper::parse
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition: interface.h:129
goby::middleware::log::ProtobufPluginBase::parse_message
std::vector< std::shared_ptr< google::protobuf::Message > > parse_message(LogEntry &log_entry)
Definition: protobuf_log_plugin.h:133
goby::middleware::log::ProtobufPlugin::ProtobufPlugin
ProtobufPlugin(bool user_pool_first=false)
Definition: protobuf_log_plugin.h:321
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::util::FlexOstream::is_warn
bool is_warn()
Definition: flex_ostream.h:82
dccl_compat.h
protobuf.h
goby::middleware::log::LogEntry::group
const Group & group() const
Definition: log_entry.h:151
goby::middleware::log::ProtobufPluginBase
Implements hooks for Protobuf metadata.
Definition: protobuf_log_plugin.h:49
goby::util::e
constexpr T e
Definition: constants.h:35
goby::middleware::MarshallingScheme::PROTOBUF
@ PROTOBUF
Definition: interface.h:53
goby::util::FlexOstream::is_debug2
bool is_debug2()
Definition: flex_ostream.h:85
goby::middleware::log::LogEntry
Definition: log_entry.h:99
goby::time::convert
ToTimeType convert(FromTimeType from_time)
Convert between time representations (this function works for tautological conversions)
Definition: convert.h:49
log_tool_config.pb.h
goby::middleware::log::ProtobufPluginBase::register_write_hooks
void register_write_hooks(std::ofstream &out_log_file) override
Definition: protobuf_log_plugin.h:127
goby::util::FlexOstream::is_debug1
bool is_debug1()
Definition: flex_ostream.h:84
goby::middleware::log::file_desc_group
constexpr goby::middleware::Group file_desc_group
Definition: protobuf_log_plugin.h:46
goby::middleware::log::LogEntry::type
const std::string & type() const
Definition: log_entry.h:150
goby::middleware::log::LogEntry::new_type_hook
static std::map< int, std::function< void(const std::string &type)> > new_type_hook
Definition: log_entry.h:119
goby::middleware::log::LogEntry::timestamp
const goby::time::SystemClock::time_point & timestamp() const
Definition: log_entry.h:152
to_string
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.hpp:24301
log_plugin.h
goby::middleware::log::LogEntry::data
const std::vector< unsigned char > & data() const
Definition: log_entry.h:148
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
convert.h
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
google::protobuf::Message
Definition: message.h:189
goby::middleware::log::ProtobufPluginBase::json_message
std::shared_ptr< nlohmann::json > json_message(LogEntry &log_entry) override
Definition: protobuf_log_plugin.h:89
log.h
goby::middleware::log::LogEntry::filter_hook
static std::map< LogFilter, std::function< void(const std::vector< unsigned char > &data)> > filter_hook
Definition: log_entry.h:123
goby::middleware::MarshallingScheme::DCCL
@ DCCL
Definition: interface.h:54
goby::middleware::log::LogPlugin
Definition: log_plugin.h:40
goby::middleware::scheme
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
goby::middleware::log::ProtobufPluginBase::hdf5_entry
std::vector< goby::middleware::HDF5ProtobufEntry > hdf5_entry(LogEntry &log_entry) override
Definition: protobuf_log_plugin.h:72
goby::middleware::log::ProtobufPlugin
Definition: protobuf_log_plugin.h:318
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::middleware::log::ProtobufPluginBase::debug_text_message
std::string debug_text_message(LogEntry &log_entry) override
Definition: protobuf_log_plugin.h:58
goby::middleware::HDF5ProtobufEntry
Represents an entry in a HDF5 scientific data file converted from a Google Protocol Buffers message.
Definition: hdf5_plugin.h:41
goby::middleware::log::ProtobufPluginBase::ProtobufPluginBase
ProtobufPluginBase(bool user_pool_first)
Definition: protobuf_log_plugin.h:56
goby::middleware::log::LogException
Definition: log_entry.h:48
goby::middleware::log::ProtobufPluginBase::register_read_hooks
void register_read_hooks(const std::ifstream &in_log_file) override
Definition: protobuf_log_plugin.h:107