157 zmq::context_t& context, std::atomic<bool>& alive,
158 std::shared_ptr<std::condition_variable> poller_cv);
162#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
163 control_socket_.setsockopt(ZMQ_LINGER, 0);
164 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
165 manager_socket_.setsockopt(ZMQ_LINGER, 0);
167 control_socket_.set(zmq::sockopt::linger, 0);
168 subscribe_socket_.set(zmq::sockopt::linger, 0);
169 manager_socket_.set(zmq::sockopt::linger, 0);
174 void poll(
long timeout_ms = -1);
175 void control_data(
const zmq::message_t& zmq_msg);
176 void subscribe_data(
const zmq::message_t& zmq_msg);
177 void manager_data(
const zmq::message_t& zmq_msg);
183 zmq::socket_t control_socket_;
184 zmq::socket_t subscribe_socket_;
185 zmq::socket_t manager_socket_;
186 std::atomic<bool>& alive_;
187 std::shared_ptr<std::condition_variable> poller_cv_;
188 std::vector<zmq::pollitem_t> poll_items_;
199 bool have_pubsub_sockets_{
false};
201 bool manager_waiting_for_reply_{
false};
206 std::chrono::milliseconds(100)};
214 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase,
216 InnerTransporter, ImplementationTag>
221 InnerTransporter, ImplementationTag>;
226 zmq_context_(cfg.zeromq_number_io_threads()),
227 zmq_main_(zmq_context_),
228 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
237 zmq_context_(cfg.zeromq_number_io_threads()),
238 zmq_main_(zmq_context_),
239 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
248 zmq_main_.reader_shutdown();
260 friend typename Base::Base;
261 friend typename Base::Common;
269 zmq_thread_ = std::make_unique<std::thread>([
this]() { zmq_read_thread_.run(); });
271 while (!zmq_main_.subscribe_ready())
274 if (zmq_main_.recv(&control_msg))
276 switch (control_msg.
type())
278 case protobuf::InprocControl::PUB_CONFIGURATION:
291 middleware::MarshallingScheme::PROTOBUF>(
292 [
this](std::shared_ptr<const protobuf::ManagerResponse> response)
295 << response->ShortDebugString() << std::endl;
296 if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
297 response->client_pid() == getpid() &&
298 response->client_name() == cfg_.client_name())
300 zmq_main_.set_hold_state(response->hold());
304 if (zmq_main_.publish_ready())
307 middleware::MarshallingScheme::PROTOBUF>(
308 groups::manager_response,
315 void _do_publish(
const std::string& identifier,
const std::vector<char>& bytes)
317 zmq_main_.publish(identifier, bytes.data(), bytes.size(), ignore_buffer_);
320 void _do_portal_subscribe(
const std::string& identifier) { zmq_main_.subscribe(identifier); }
321 void _do_portal_unsubscribe(
const std::string& identifier)
323 zmq_main_.unsubscribe(identifier);
326 void _do_portal_wildcard_subscribe() { zmq_main_.subscribe(delimiter_str); }
327 void _do_portal_wildcard_unsubscribe() { zmq_main_.unsubscribe(delimiter_str); }
329 int _poll(std::unique_ptr<std::unique_lock<std::mutex>>& lock)
332 protobuf::InprocControl new_control_msg;
334#ifdef USE_OLD_ZMQ_CPP_API
335 int flags = ZMQ_NOBLOCK;
337 auto flags = zmq::recv_flags::dontwait;
340 while (zmq_main_.recv(&new_control_msg, flags))
341 zmq_main_.control_buffer().push_back(new_control_msg);
343 while (!zmq_main_.control_buffer().empty())
345 const auto& control_msg = zmq_main_.control_buffer().front();
346 switch (control_msg.type())
348 case protobuf::InprocControl::RECEIVE:
351 this->_handle_received_data(lock, control_msg.received_data());
355 case protobuf::InprocControl::REQUEST_HOLD_STATE:
357 protobuf::ManagerRequest req;
359 req.set_ready(ready_);
360 req.set_request(protobuf::PROVIDE_HOLD_STATE);
361 req.set_client_name(cfg_.client_name());
362 req.set_client_pid(getpid());
365 << req.ShortDebugString() << std::endl;
367 ignore_buffer_ =
true;
368 this->
template publish<groups::manager_request>(req);
369 ignore_buffer_ =
false;
375 zmq_main_.control_buffer().pop_front();
381 const protobuf::InterProcessPortalConfig cfg_;
383 std::unique_ptr<std::thread> zmq_thread_;
384 std::atomic<bool> zmq_alive_{
true};
385 zmq::context_t zmq_context_;
386 InterProcessPortalMainThread zmq_main_;
387 InterProcessPortalReadThread zmq_read_thread_;
390 bool ignore_buffer_{
false};
424 :
Manager(context, cfg, router)
426 for (
const auto& req_c : hold.
required_client()) required_clients_.insert(req_c);
438 std::set<std::string> reported_clients_;
439 std::set<std::string> required_clients_;
441 zmq::context_t& context_;
445 std::vector<zmq::pollitem_t> poll_items_;
449 SOCKET_SUBSCRIBE = 1,
456 std::unique_ptr<zmq::socket_t> manager_socket_;
457 std::unique_ptr<zmq::socket_t> subscribe_socket_;
458 std::unique_ptr<zmq::socket_t> publish_socket_;
462 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
466 std::string zmq_filter_rep_{
468 middleware::SerializerParserHelper<
469 protobuf::ManagerResponse,
470 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
static std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
void set_publish_cfg(const protobuf::Socket &cfg)
void unsubscribe(const std::string &identifier)
std::deque< protobuf::InprocControl > & control_buffer()
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())