Terrordrone
Terrordrone

Reputation: 139

ZMQ detect client unavailability with heartbeat

I am experiencing with zmq, but I could not find an answer in the docs and examples that the proper use of the following socket options:

The documentation states that they can be used with any socket types ("when using connection-oriented transports"). Is there a way to use them with REQ/REP socket pairs?

Test client code:

zmq::context_t context( 1 );
zmq::socket_t client( context, ZMQ_REQ );
client.connect( "tcp://localhost:5555" );

// Send request.
std::string request( "test message" );
zmq::message_t message( string.size() );
memcpy( message.data(), string.data(), string.size() );
socket.send( message, flags );

// Wait for reply.
socket.recv( &message );

// Do some work that takes cca 10 secs.
// ...

delete client;

Test server code:

zmq::context_t context( 1 );
zmq::socket_t server( context, zmq::socket_type::rep );

//server.set( zmq::sockopt::rcvtimeo, 5000 );
server.set( zmq::sockopt::heartbeat_ivl, 1000 );
server.set( zmq::sockopt::heartbeat_timeout, 3000 );
server.set( zmq::sockopt::heartbeat_ttl, 3000 );

server.bind( "tcp://*:5555" );

// Wait for request.
zmq::message_t message;
server.recv( &message );

// Send reply.
zmq::message_t message2("Test reply.", 11);
server.send( message);

// Wait for new request.
server.recv( &message );

// ...

My goal is to end the server side after the client is done. Socket option zmq::sockopt::rcvtimeo works well, it closes the connection on the server side. I thought that the heartbeat socket options would cause something similar, I mean close the server after the client has finished its job and not earlier. I tried them in all possible combinations on both server and client sides, but the server side always gets stuck in the second recv opretation. Checking the localhost network traffic with Wireshark tells that the ping-pong messages are delivered, but that's all.

So my questions are:

Thanks in advance!

Upvotes: 0

Views: 1410

Answers (1)

bazza
bazza

Reputation: 8394

Ok, so it looks like you need to use the zmq_socket_monitor() function. That causes the context to create itself a ZMQ_PAIR socket which it will bind to the endpoint you specify. You have to create your own ZMQ_PAIR socket, and connect it to that endpoint too.

You can read event messages from your ZMQ_PAIR socket (see the api reference for how these messages are formatted). So you can include that PAIR socket in a call to zmq_poll(), thus your application can be asynchronously informed when there is a change of status in the socket that you referred to in the zmq_socket_monitor() call.

The event that will be reported for a heartbeat timeout is, I think, ZMQ_EVENT_DISCONNECTED.

See here.

I'd say that, to get the most value out of this, you're pretty much obliged to use zmq_poll() (or similar). For example, a client expecting to receive messages can't be both blocked on a call to zmq_recv() from the socket connected to the server, and also blocked on the zmq_recv() from the monitor ZMQ_PAIR socket. Whereas if you're blocked on a call to zmq_poll() that's waiting for both the server and monitor sockets, then that'll return either when there is a message from the server, or some problem has arisen that means you're not going to get the message.

Upvotes: 0

Related Questions