156 zmq::context_t& context, std::atomic<bool>& alive,
157 std::shared_ptr<std::condition_variable> poller_cv);
161#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
162 control_socket_.setsockopt(ZMQ_LINGER, 0);
163 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
164 manager_socket_.setsockopt(ZMQ_LINGER, 0);
166 control_socket_.set(zmq::sockopt::linger, 0);
167 subscribe_socket_.set(zmq::sockopt::linger, 0);
168 manager_socket_.set(zmq::sockopt::linger, 0);
173 void poll(
long timeout_ms = -1);
174 void control_data(
const zmq::message_t& zmq_msg);
175 void subscribe_data(
const zmq::message_t& zmq_msg);
176 void manager_data(
const zmq::message_t& zmq_msg);
182 zmq::socket_t control_socket_;
183 zmq::socket_t subscribe_socket_;
184 zmq::socket_t manager_socket_;
185 std::atomic<bool>& alive_;
186 std::shared_ptr<std::condition_variable> poller_cv_;
187 std::vector<zmq::pollitem_t> poll_items_;
198 bool have_pubsub_sockets_{
false};
200 bool manager_waiting_for_reply_{
false};
205 std::chrono::milliseconds(100)};
213 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
215 InnerTransporter, ImplementationTag>
220 InnerTransporter, ImplementationTag>;
225 zmq_context_(cfg.zeromq_number_io_threads()),
226 zmq_main_(zmq_context_),
227 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
236 zmq_context_(cfg.zeromq_number_io_threads()),
237 zmq_main_(zmq_context_),
238 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
247 zmq_main_.reader_shutdown();
259 friend typename Base::Base;
260 friend typename Base::Common;
268 zmq_thread_ = std::make_unique<std::thread>([
this]() { zmq_read_thread_.run(); });
270 while (!zmq_main_.subscribe_ready())
273 if (zmq_main_.recv(&control_msg))
275 switch (control_msg.
type())
277 case protobuf::InprocControl::PUB_CONFIGURATION:
290 middleware::MarshallingScheme::PROTOBUF>(
291 [
this](std::shared_ptr<const protobuf::ManagerResponse> response)
294 << response->ShortDebugString() << std::endl;
295 if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
296 response->client_pid() == getpid() &&
297 response->client_name() == cfg_.client_name())
299 zmq_main_.set_hold_state(response->hold());
303 if (zmq_main_.publish_ready())
306 middleware::MarshallingScheme::PROTOBUF>(
307 groups::manager_response,
314 void _do_publish(
const std::string& identifier,
const std::vector<char>& bytes)
316 zmq_main_.publish(identifier, &bytes[0], bytes.size(), ignore_buffer_);
319 void _do_portal_subscribe(
const std::string& identifier) { zmq_main_.subscribe(identifier); }
320 void _do_portal_unsubscribe(
const std::string& identifier)
322 zmq_main_.unsubscribe(identifier);
325 void _do_portal_wildcard_subscribe() { zmq_main_.subscribe(delimiter_str); }
326 void _do_portal_wildcard_unsubscribe() { zmq_main_.unsubscribe(delimiter_str); }
328 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
331 protobuf::InprocControl new_control_msg;
333#ifdef USE_OLD_ZMQ_CPP_API
334 int flags = ZMQ_NOBLOCK;
336 auto flags = zmq::recv_flags::dontwait;
339 while (zmq_main_.recv(&new_control_msg, flags))
340 zmq_main_.control_buffer().push_back(new_control_msg);
342 while (!zmq_main_.control_buffer().empty())
344 const auto& control_msg = zmq_main_.control_buffer().front();
345 switch (control_msg.type())
347 case protobuf::InprocControl::RECEIVE:
350 this->_handle_received_data(lock, control_msg.received_data());
354 case protobuf::InprocControl::REQUEST_HOLD_STATE:
356 protobuf::ManagerRequest req;
358 req.set_ready(ready_);
359 req.set_request(protobuf::PROVIDE_HOLD_STATE);
360 req.set_client_name(cfg_.client_name());
361 req.set_client_pid(getpid());
364 << req.ShortDebugString() << std::endl;
366 ignore_buffer_ =
true;
367 this->
template publish<groups::manager_request>(req);
368 ignore_buffer_ =
false;
374 zmq_main_.control_buffer().pop_front();
380 const protobuf::InterProcessPortalConfig cfg_;
382 std::unique_ptr<std::thread> zmq_thread_;
383 std::atomic<bool> zmq_alive_{
true};
384 zmq::context_t zmq_context_;
385 InterProcessPortalMainThread zmq_main_;
386 InterProcessPortalReadThread zmq_read_thread_;
389 bool ignore_buffer_{
false};
423 :
Manager(context, cfg, router)
425 for (
const auto& req_c : hold.
required_client()) required_clients_.insert(req_c);
437 std::set<std::string> reported_clients_;
438 std::set<std::string> required_clients_;
440 zmq::context_t& context_;
444 std::vector<zmq::pollitem_t> poll_items_;
448 SOCKET_SUBSCRIBE = 1,
455 std::unique_ptr<zmq::socket_t> manager_socket_;
456 std::unique_ptr<zmq::socket_t> subscribe_socket_;
457 std::unique_ptr<zmq::socket_t> publish_socket_;
461 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
465 std::string zmq_filter_rep_{
467 middleware::SerializerParserHelper<
468 protobuf::ManagerResponse,
469 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
static std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
void set_publish_cfg(const protobuf::Socket &cfg)
void unsubscribe(const std::string &identifier)
std::deque< protobuf::InprocControl > & control_buffer()
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())