Goby v2
test.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 // tests blackout functionality of ZeroMQService
23 
24 #include "goby/common/zeromq_service.h"
25 
26 #include <boost/thread.hpp>
27 
28 void node_inbox(goby::common::MarshallingScheme marshalling_scheme, const std::string& identifier,
29  const std::string& data, int socket_id);
30 
31 void run_basic_test(int test_count, int expected_blackouts, int ms_wait);
32 
33 const std::string identifier_ = "HI/";
34 int inbox_count_ = 0;
35 const char data_[] = {'h', 'i', '\0'};
36 
38 // must share context for ipc
39 goby::common::ZeroMQService node2_(node1_.zmq_context());
40 
41 enum
42 {
43  SOCKET_SUBSCRIBE = 240,
44  SOCKET_PUBLISH = 211
45 };
46 
47 int main(int argc, char* argv[])
48 {
49  goby::glog.add_stream(goby::common::logger::DEBUG3, &std::cerr);
50  goby::glog.set_name(argv[0]);
51 
52  goby::common::protobuf::ZeroMQServiceConfig publisher_cfg, subscriber_cfg;
53  {
55  subscriber_cfg.add_socket();
56  subscriber_socket->set_socket_type(
57  goby::common::protobuf::ZeroMQServiceConfig::Socket::SUBSCRIBE);
58  subscriber_socket->set_transport(goby::common::protobuf::ZeroMQServiceConfig::Socket::IPC);
59  subscriber_socket->set_connect_or_bind(
60  goby::common::protobuf::ZeroMQServiceConfig::Socket::CONNECT);
61 
62  subscriber_socket->set_socket_id(SOCKET_SUBSCRIBE);
63  subscriber_socket->set_socket_name("test3_ipc_socket");
64  std::cout << subscriber_socket->DebugString() << std::endl;
65  }
66 
67  {
69  publisher_cfg.add_socket();
70  publisher_socket->set_socket_type(
71  goby::common::protobuf::ZeroMQServiceConfig::Socket::PUBLISH);
72  publisher_socket->set_transport(goby::common::protobuf::ZeroMQServiceConfig::Socket::IPC);
73  publisher_socket->set_connect_or_bind(
74  goby::common::protobuf::ZeroMQServiceConfig::Socket::BIND);
75  publisher_socket->set_socket_name("test3_ipc_socket");
76  publisher_socket->set_socket_id(SOCKET_PUBLISH);
77  std::cout << publisher_socket->DebugString() << std::endl;
78  }
79 
80  node1_.set_cfg(publisher_cfg);
81 
82  node2_.set_cfg(subscriber_cfg);
83  node2_.connect_inbox_slot(&node_inbox);
84  node2_.subscribe_all(SOCKET_SUBSCRIBE);
85 
86  usleep(1e3);
87 
88  // test local blackout
89  node2_.socket_from_id(SOCKET_SUBSCRIBE)
90  .set_blackout(goby::common::MARSHALLING_CSTR, identifier_,
91  boost::posix_time::milliseconds(6));
92  run_basic_test(3, 1, 5);
93  run_basic_test(8, 4, 4);
94 
95  // remove local blackout
96  node2_.socket_from_id(SOCKET_SUBSCRIBE)
97  .clear_blackout(goby::common::MARSHALLING_CSTR, identifier_);
98  run_basic_test(3, 0, 5);
99  run_basic_test(8, 0, 4);
100 
101  // add global blackout
102  node2_.socket_from_id(SOCKET_SUBSCRIBE).set_global_blackout(boost::posix_time::milliseconds(6));
103  run_basic_test(3, 1, 5);
104  run_basic_test(8, 4, 4);
105 
106  // remove global blackout
107  node2_.socket_from_id(SOCKET_SUBSCRIBE).clear_global_blackout();
108  run_basic_test(3, 0, 5);
109  run_basic_test(8, 0, 4);
110 
111  // test both at once - local blackout will take precedence
112  node2_.socket_from_id(SOCKET_SUBSCRIBE).set_global_blackout(boost::posix_time::milliseconds(6));
113  node2_.socket_from_id(SOCKET_SUBSCRIBE)
114  .set_blackout(goby::common::MARSHALLING_CSTR, identifier_,
115  boost::posix_time::milliseconds(0));
116  run_basic_test(3, 0, 5);
117  run_basic_test(8, 0, 4);
118 
119  std::cout << "all tests passed" << std::endl;
120 }
121 
122 void run_basic_test(int test_count, int expected_blackouts, int ms_wait)
123 {
124  inbox_count_ = 0;
125  for (int i = 0; i < test_count; ++i)
126  {
127  std::cout << "publishing " << data_ << std::endl;
128  node1_.send(goby::common::MARSHALLING_CSTR, identifier_, std::string(data_),
129  SOCKET_PUBLISH);
130  node2_.poll(1e6);
131  // wait ms_wait milliseconds
132  usleep(ms_wait * 1e3);
133  }
134 
135  assert(inbox_count_ == test_count - expected_blackouts);
136 }
137 
138 void node_inbox(goby::common::MarshallingScheme marshalling_scheme, const std::string& identifier,
139  const std::string& data, int socket_id)
140 {
141  assert(identifier == identifier_);
142  assert(marshalling_scheme == goby::common::MARSHALLING_CSTR);
143  assert(!strcmp(data.c_str(), data_));
144  assert(socket_id == SOCKET_SUBSCRIBE);
145 
146  std::cout << "Received: " << data << std::endl;
147  ++inbox_count_;
148 }
void set_name(const std::string &s)
Set the name of the application that the logger is serving.
Definition: flex_ostream.h:67
common::FlexOstream glog
Access the Goby logger through this object.
void add_stream(logger::Verbosity verbosity=logger::VERBOSE, std::ostream *os=0)
Attach a stream object (e.g. std::cout, std::ofstream, ...) to the logger with desired verbosity...
Definition: flex_ostream.h:96