Goby v2
goby_store_server.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include <sqlite3.h>
23 
24 #include <boost/filesystem.hpp>
25 
26 #include "goby/acomms/acomms_constants.h"
27 #include "goby/acomms/protobuf/store_server.pb.h"
28 #include "goby/common/time.h"
29 #include "goby/common/zeromq_application_base.h"
30 #include "goby/pb/protobuf_node.h"
31 #include "goby/util/binary.h"
32 #include "goby_store_server_config.pb.h"
33 
34 using namespace goby::common::logger;
36 
37 namespace goby
38 {
39 namespace acomms
40 {
43 {
44  public:
47  {
48  if (db_)
49  sqlite3_close(db_);
50  }
51 
52  private:
53  void handle_request(const protobuf::StoreServerRequest& request);
54  void loop();
55 
56  void check(int rc, const std::string& error_prefix);
57 
58  private:
59  static goby::common::ZeroMQService zeromq_service_;
61  sqlite3* db_;
62 
63  // maps modem_id to time (microsecs since UNIX)
64  std::map<int, uint64> last_request_time_;
65 };
66 } // namespace acomms
67 } // namespace goby
68 
69 goby::common::ZeroMQService goby::acomms::GobyStoreServer::zeromq_service_;
70 
71 int main(int argc, char* argv[])
72 {
74  goby::run<goby::acomms::GobyStoreServer>(argc, argv, &cfg);
75 }
76 
77 goby::acomms::GobyStoreServer::GobyStoreServer(protobuf::GobyStoreServerConfig* cfg)
78  : ZeroMQApplicationBase(&zeromq_service_, cfg), StaticProtobufNode(&zeromq_service_),
79  cfg_(*cfg), db_(0)
80 {
81  // create database
82  if (!boost::filesystem::exists(cfg_.db_file_dir()))
83  throw(goby::Exception("db_file_dir does not exist: " + cfg_.db_file_dir()));
84 
85  std::string full_db_name = cfg_.db_file_dir() + "/";
86  if (cfg_.has_db_file_name())
87  full_db_name += cfg_.db_file_name();
88  else
89  full_db_name += "goby_store_server_" + goby::common::goby_file_timestamp() + ".db";
90 
91  int rc;
92  rc = sqlite3_open(full_db_name.c_str(), &db_);
93  if (rc)
94  throw(goby::Exception("Can't open database: " + std::string(sqlite3_errmsg(db_))));
95 
96  // initial tables
97  char* errmsg;
98  rc = sqlite3_exec(db_,
99  "CREATE TABLE IF NOT EXISTS ModemTransmission (id INTEGER PRIMARY KEY ASC "
100  "AUTOINCREMENT, src INTEGER, dest INTEGER, microtime INTEGER, bytes BLOB);",
101  0, 0, &errmsg);
102 
103  if (rc != SQLITE_OK)
104  {
105  std::string error(errmsg);
106  sqlite3_free(errmsg);
107 
108  throw(goby::Exception("SQL error: " + error));
109  }
110 
111  // set up receiving requests
112  on_receipt<protobuf::StoreServerRequest>(cfg_.reply_socket().socket_id(),
113  &GobyStoreServer::handle_request, this);
114 
115  // start zeromqservice
117  service_cfg.add_socket()->CopyFrom(cfg_.reply_socket());
118  zeromq_service_.set_cfg(service_cfg);
119 }
120 
121 void goby::acomms::GobyStoreServer::loop() {}
122 
123 void goby::acomms::GobyStoreServer::handle_request(const protobuf::StoreServerRequest& request)
124 {
125  glog.is(DEBUG1) && glog << "Got request: " << request.DebugString() << std::endl;
126 
127  uint64 request_time = goby_time<uint64>();
128 
130  response.set_modem_id(request.modem_id());
131 
132  // insert any rows into the table
133  for (int i = 0, n = request.outbox_size(); i < n; ++i)
134  {
135  glog.is(DEBUG1) && glog << "Trying to insert (size: " << request.outbox(i).ByteSize()
136  << "): " << request.outbox(i).DebugString() << std::endl;
137 
138  sqlite3_stmt* insert;
139 
140  check(
141  sqlite3_prepare(
142  db_,
143  "INSERT INTO ModemTransmission (src, dest, microtime, bytes) VALUES (?, ?, ?, ?);",
144  -1, &insert, 0),
145  "Insert statement preparation failed");
146  check(sqlite3_bind_int(insert, 1, request.outbox(i).src()), "Insert `src` binding failed");
147  check(sqlite3_bind_int(insert, 2, request.outbox(i).dest()),
148  "Insert `dest` binding failed");
149  check(sqlite3_bind_int64(insert, 3, goby_time<uint64>()),
150  "Insert `microtime` binding failed");
151 
152  std::string bytes;
153  request.outbox(i).SerializeToString(&bytes);
154 
155  // glog.is(DEBUG1) && glog << "Bytes (hex): " << goby::util::hex_encode(bytes) << std::endl;
156 
157  check(sqlite3_bind_blob(insert, 4, bytes.data(), bytes.size(), SQLITE_STATIC),
158  "Insert `bytes` binding failed");
159 
160  check(sqlite3_step(insert), "Insert step failed");
161  check(sqlite3_finalize(insert), "Insert statement finalize failed");
162 
163  glog.is(DEBUG1) && glog << "Insert successful." << std::endl;
164  }
165 
166  // find any rows to respond with
167  glog.is(DEBUG1) && glog << "Trying to select for dest: " << request.modem_id() << std::endl;
168 
169  if (!last_request_time_.count(request.modem_id()))
170  last_request_time_.insert(std::make_pair(request.modem_id(), 0));
171 
172  sqlite3_stmt* select;
173  check(sqlite3_prepare(db_,
174  "SELECT bytes FROM ModemTransmission WHERE src != ?1 AND (microtime > ?2 "
175  "AND microtime <= ?3 );",
176  -1, &select, 0),
177  "Select statement preparation failed");
178 
179  check(sqlite3_bind_int(select, 1, request.modem_id()),
180  "Select request modem_id binding failed");
181  check(sqlite3_bind_int64(select, 2, last_request_time_[request.modem_id()]),
182  "Select `microtime` last time binding failed");
183  check(sqlite3_bind_int64(select, 3, request_time),
184  "Select `microtime` this time binding failed");
185 
186  int rc = sqlite3_step(select);
187  while (rc == SQLITE_ROW)
188  {
189  switch (rc)
190  {
191  case SQLITE_ROW:
192  {
193  const unsigned char* bytes = sqlite3_column_text(select, 0);
194  int num_bytes = sqlite3_column_bytes(select, 0);
195 
196  // std::string byte_string(reinterpret_cast<const char*>(bytes), num_bytes);
197 
198  // glog.is(DEBUG1) && glog << "Bytes (hex): " << goby::util::hex_encode(byte_string) << std::endl;
199 
200  response.add_inbox()->ParseFromArray(bytes, num_bytes);
201  glog.is(DEBUG1) && glog << "Got message for inbox (size: " << num_bytes << "): "
202  << response.inbox(response.inbox_size() - 1).DebugString()
203  << std::endl;
204  rc = sqlite3_step(select);
205  }
206  break;
207 
208  default: check(rc, "Select step failed"); break;
209  }
210  }
211 
212  check(sqlite3_finalize(select), "Select statement finalize failed");
213  glog.is(DEBUG1) && glog << "Select successful." << std::endl;
214 
215  last_request_time_[request.modem_id()] = request_time;
216 
217  send(response, cfg_.reply_socket().socket_id());
218 }
219 
220 void goby::acomms::GobyStoreServer::check(int rc, const std::string& error_prefix)
221 {
222  if (rc != SQLITE_OK && rc != SQLITE_DONE)
223  throw(goby::Exception(error_prefix + ": " + std::string(sqlite3_errmsg(db_))));
224 }
uint64 goby_time< uint64 >()
Returns current UTC time as integer microseconds since 1970-01-01 00:00:00.
Definition: time.h:113
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
Definition: time.h:104
std::string goby_file_timestamp()
ISO string representation of goby_time()
Definition: time.h:156
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
simple exception class for goby applications
Definition: exception.h:32