Goby v2
file_transfer.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include <fstream>
23 #include <iostream>
24 
25 #include "goby/common/logger.h"
26 #include "goby/common/logger/term_color.h"
27 #include "goby/common/zeromq_service.h"
28 
29 #include "goby/pb/application.h"
30 
31 #include "file_transfer_config.pb.h"
32 #include "goby/acomms/protobuf/file_transfer.pb.h"
33 
34 using namespace goby::common::logger;
35 using goby::glog;
36 
37 namespace goby
38 {
39 namespace acomms
40 {
42 {
43  public:
45  ~FileTransfer();
46 
47  private:
48  void loop() {}
49 
50  void push_file();
51  void pull_file();
52 
53  int send_file(const std::string& path);
54 
55  void handle_remote_transfer_request(const protobuf::TransferRequest& request);
56  void handle_receive_fragment(const protobuf::FileFragment& fragment);
57 
58  void handle_receive_response(const protobuf::TransferResponse& response);
59 
60  void handle_ack(const protobuf::TransferRequest& request)
61  {
62  std::cout << "Got ack for request: " << request.DebugString() << std::endl;
63  waiting_for_request_ack_ = false;
64  }
65 
66  private:
68 
69  enum
70  {
71  MAX_FILE_TRANSFER_BYTES = 1024 * 1024
72  };
73 
74  typedef int ModemId;
75 
76  std::map<ModemId, std::map<int, protobuf::FileFragment> > receive_files_;
77  std::map<ModemId, protobuf::TransferRequest> requests_;
78  bool waiting_for_request_ack_;
79 };
80 } // namespace acomms
81 } // namespace goby
82 
83 int main(int argc, char* argv[])
84 {
86  goby::run<goby::acomms::FileTransfer>(argc, argv, &cfg);
87 }
88 
89 using goby::glog;
90 
91 goby::acomms::FileTransfer::FileTransfer(protobuf::FileTransferConfig* cfg)
92  : Application(cfg), cfg_(*cfg), waiting_for_request_ack_(false)
93 {
94  glog.is(DEBUG1) && glog << cfg_.DebugString() << std::endl;
95 
96  if (cfg_.action() != protobuf::FileTransferConfig::WAIT)
97  {
98  if (!cfg_.has_remote_id())
99  {
100  glog.is(WARN) && glog << "Must set remote_id modem ID for file destination."
101  << std::endl;
102  exit(EXIT_FAILURE);
103  }
104  if (!cfg_.has_local_file())
105  {
106  glog.is(WARN) && glog << "Must set local_file path." << std::endl;
107  exit(EXIT_FAILURE);
108  }
109  if (!cfg_.has_remote_id())
110  {
111  glog.is(WARN) && glog << "Must set remote_file path." << std::endl;
112  exit(EXIT_FAILURE);
113  }
114 
115  const unsigned max_path = protobuf::TransferRequest::descriptor()
116  ->FindFieldByName("file")
117  ->options()
118  .GetExtension(dccl::field)
119  .max_length();
120  if (cfg_.remote_file().size() > max_path)
121  {
122  glog.is(WARN) && glog << "remote_file full path must be less than " << max_path
123  << " characters." << std::endl;
124  exit(EXIT_FAILURE);
125  }
126  }
127 
128  subscribe(&FileTransfer::handle_ack, this,
129  "QueueAckOrig" + goby::util::as<std::string>(cfg_.local_id()));
130 
131  subscribe(&FileTransfer::handle_remote_transfer_request, this,
132  "QueueRx" + goby::util::as<std::string>(cfg_.local_id()));
133 
134  subscribe(&FileTransfer::handle_receive_fragment, this,
135  "QueueRx" + goby::util::as<std::string>(cfg_.local_id()));
136 
137  subscribe(&FileTransfer::handle_receive_response, this,
138  "QueueRx" + goby::util::as<std::string>(cfg_.local_id()));
139 
140  try
141  {
142  if (cfg_.action() == protobuf::FileTransferConfig::PUSH)
143  push_file();
144  else if (cfg_.action() == protobuf::FileTransferConfig::PULL)
145  pull_file();
146  }
147  catch (protobuf::TransferResponse::ErrorCode& c)
148  {
149  glog.is(WARN) && glog << "File transfer action failed: "
150  << protobuf::TransferResponse::ErrorCode_Name(c) << std::endl;
151  if (!cfg_.daemon())
152  exit(EXIT_FAILURE);
153  }
154  catch (std::exception& e)
155  {
156  glog.is(WARN) && glog << "File transfer action failed: " << e.what() << std::endl;
157  if (!cfg_.daemon())
158  exit(EXIT_FAILURE);
159  }
160 }
161 
162 void goby::acomms::FileTransfer::push_file()
163 {
165  request.set_src(cfg_.local_id());
166  request.set_dest(cfg_.remote_id());
167  request.set_push_or_pull(protobuf::TransferRequest::PUSH);
168  request.set_file(cfg_.remote_file());
169 
170  publish(request, "QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
171 
172  double start_time = goby::common::goby_time<double>();
173  waiting_for_request_ack_ = true;
174  while (goby::common::goby_time<double>() < start_time + cfg_.request_timeout())
175  {
176  zeromq_service().poll(10000);
177  if (!waiting_for_request_ack_)
178  {
179  send_file(cfg_.local_file());
180  break;
181  }
182  }
183 }
184 
185 void goby::acomms::FileTransfer::pull_file()
186 {
188  request.set_src(cfg_.local_id());
189  request.set_dest(cfg_.remote_id());
190  request.set_push_or_pull(protobuf::TransferRequest::PULL);
191  request.set_file(cfg_.remote_file());
192 
193  publish(request, "QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
194 
195  // set up local request for receiving and writing
196  request.set_file(cfg_.local_file());
197  request.set_src(cfg_.remote_id());
198  request.set_dest(cfg_.local_id());
199  receive_files_[request.src()].clear();
200  requests_[request.src()] = request;
201 }
202 
203 int goby::acomms::FileTransfer::send_file(const std::string& path)
204 {
205  std::ifstream send_file(path.c_str(), std::ios::binary | std::ios::ate);
206 
207  glog.is(VERBOSE) && glog << "Attempting to transfer: " << path << std::endl;
208 
209  // check open
210  if (!send_file.is_open())
211  throw protobuf::TransferResponse::COULD_NOT_READ_FILE;
212 
213  // check size
214  std::streampos size = send_file.tellg();
215  glog.is(VERBOSE) && glog << "File size: " << size << std::endl;
216 
217  if (size > MAX_FILE_TRANSFER_BYTES)
218  {
219  glog.is(WARN) && glog << "File exceeds maximum supported size of "
220  << MAX_FILE_TRANSFER_BYTES << "B" << std::endl;
221  throw protobuf::TransferResponse::FILE_TOO_LARGE;
222  }
223 
224  // seek to front
225  send_file.seekg(0, send_file.beg);
226 
227  int size_acquired = 0;
228  // fragment into little bits
229 
230  int fragment_size = protobuf::FileFragment::descriptor()
231  ->FindFieldByName("data")
232  ->options()
233  .GetExtension(dccl::field)
234  .max_length();
235 
236  protobuf::FileFragment reference_fragment;
237  reference_fragment.set_src(cfg_.local_id());
238  reference_fragment.set_dest(cfg_.remote_id());
239 
240  std::vector<protobuf::FileFragment> fragments(std::ceil((double)size / fragment_size),
241  reference_fragment);
242 
243  std::vector<protobuf::FileFragment>::iterator fragments_it = fragments.begin();
244  std::vector<char> buffer(fragment_size);
245  int fragment_idx = 0;
246  while (send_file.good())
247  {
248  protobuf::FileFragment& fragment = *(fragments_it++);
249  send_file.read(&buffer[0], fragment_size);
250  int bytes_read = send_file.gcount();
251  size_acquired += bytes_read;
252  fragment.set_fragment(fragment_idx++);
253  if (size_acquired == size)
254  fragment.set_is_last_fragment(true);
255  else
256  fragment.set_is_last_fragment(false);
257  fragment.set_num_bytes(bytes_read);
258 
259  fragment.set_data(std::string(buffer.begin(), buffer.begin() + bytes_read));
260  }
261 
262  if (!send_file.eof())
263  throw protobuf::TransferResponse::ERROR_WHILE_READING;
264 
265  // FOR TESTING!
266  // fragments.resize(fragments.size()*2);
267  // std::copy_backward(fragments.begin(), fragments.begin()+fragments.size()/2, fragments.end());
268  // std::random_shuffle(fragments.begin(), fragments.end());
269  for (int i = 0, n = fragments.size(); i < n; ++i)
270  {
271  glog.is(VERBOSE) && glog << fragments[i].ShortDebugString() << std::endl;
272  publish(fragments[i], "QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
273  }
274 
275  return fragment_idx;
276 }
277 
278 goby::acomms::FileTransfer::~FileTransfer() {}
279 
280 void goby::acomms::FileTransfer::handle_remote_transfer_request(
281  const protobuf::TransferRequest& request)
282 {
283  glog.is(VERBOSE) && glog << "Received remote transfer request: " << request.DebugString()
284  << std::endl;
285 
286  if (request.push_or_pull() == protobuf::TransferRequest::PUSH)
287  {
288  glog.is(VERBOSE) && glog << "Preparing to receive file..." << std::endl;
289  receive_files_[request.src()].clear();
290  }
291  else if (request.push_or_pull() == protobuf::TransferRequest::PULL)
292  {
294  response.set_src(request.dest());
295  response.set_dest(request.src());
296  try
297  {
298  response.set_num_fragments(send_file(request.file()));
299  response.set_transfer_successful(true);
300  }
301  catch (protobuf::TransferResponse::ErrorCode& c)
302  {
303  glog.is(WARN) && glog << "File transfer action failed: "
304  << protobuf::TransferResponse::ErrorCode_Name(c) << std::endl;
305  response.set_transfer_successful(false);
306  response.set_error(c);
307  if (!cfg_.daemon())
308  exit(EXIT_FAILURE);
309  }
310  catch (std::exception& e)
311  {
312  glog.is(WARN) && glog << "File transfer action failed: " << e.what() << std::endl;
313  if (!cfg_.daemon())
314  exit(EXIT_FAILURE);
315 
316  response.set_transfer_successful(false);
317  response.set_error(protobuf::TransferResponse::OTHER_ERROR);
318  }
319  publish(response, "QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
320  }
321  requests_[request.src()] = request;
322 }
323 
324 void goby::acomms::FileTransfer::handle_receive_fragment(const protobuf::FileFragment& fragment)
325 {
326  std::map<int, protobuf::FileFragment>& receive = receive_files_[fragment.src()];
327 
328  receive.insert(std::make_pair(fragment.fragment(), fragment));
329 
330  glog.is(VERBOSE) && glog << "Received fragment #" << fragment.fragment()
331  << ", total received: " << receive.size() << std::endl;
332 
333  if (receive.rbegin()->second.is_last_fragment())
334  {
335  if ((int)receive.size() == receive.rbegin()->second.fragment() + 1)
336  {
338  response.set_src(requests_[fragment.src()].dest());
339  response.set_dest(requests_[fragment.src()].src());
340 
341  try
342  {
343  glog.is(VERBOSE) && glog << "Received all fragments!" << std::endl;
344  glog.is(VERBOSE) && glog << "Writing to " << requests_[fragment.src()].file()
345  << std::endl;
346  std::ofstream receive_file(requests_[fragment.src()].file().c_str(),
347  std::ios::binary);
348 
349  // check open
350  if (!receive_file.is_open())
351  throw(protobuf::TransferResponse::COULD_NOT_WRITE_FILE);
352 
353  for (std::map<int, protobuf::FileFragment>::const_iterator it = receive.begin(),
354  end = receive.end();
355  it != end; ++it)
356  { receive_file.write(it->second.data().c_str(), it->second.num_bytes()); }
357 
358  receive_file.close();
359  response.set_transfer_successful(true);
360  if (!cfg_.daemon())
361  exit(EXIT_SUCCESS);
362  }
363  catch (protobuf::TransferResponse::ErrorCode& c)
364  {
365  glog.is(WARN) && glog << "File transfer action failed: "
366  << protobuf::TransferResponse::ErrorCode_Name(c) << std::endl;
367  response.set_transfer_successful(false);
368  response.set_error(c);
369 
370  if (!cfg_.daemon())
371  exit(EXIT_FAILURE);
372  }
373  catch (std::exception& e)
374  {
375  glog.is(WARN) && glog << "File transfer action failed: " << e.what() << std::endl;
376  if (!cfg_.daemon())
377  exit(EXIT_FAILURE);
378 
379  response.set_transfer_successful(false);
380  response.set_error(protobuf::TransferResponse::OTHER_ERROR);
381  }
382  publish(response, "QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
383  }
384  else
385  {
386  glog.is(VERBOSE) && glog << "Still waiting on some fragments..." << std::endl;
387  }
388  }
389 }
390 
391 void goby::acomms::FileTransfer::handle_receive_response(const protobuf::TransferResponse& response)
392 {
393  glog.is(VERBOSE) && glog << "Received response for file transfer: " << response.DebugString()
394  << std::flush;
395 
396  if (!response.transfer_successful())
397  glog.is(WARN) && glog << "Transfer failed: "
398  << protobuf::TransferResponse::ErrorCode_Name(response.error())
399  << std::endl;
400 
401  if (!cfg_.daemon())
402  {
403  if (response.transfer_successful())
404  {
405  glog.is(VERBOSE) && glog << "File transfer completed successfully." << std::endl;
406  }
407  else
408  {
409  exit(EXIT_FAILURE);
410  }
411  }
412 }
Base class provided for users to generate applications that participate in the Goby publish/subscribe...
Definition: application.h:49
double goby_time< double >()
Returns current UTC time as seconds and fractional seconds since 1970-01-01 00:00:00.
Definition: time.h:130
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.