Reputation: 6076
Struggling to properly shut down a simple pyzmq based client when the server is not available. Below are 2 snippets.
First the server. This is more or less the pyzmq example. No special code here:
import zmq
import json
port = 5555
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))
while True:
message = socket.recv_json()
print(json.dumps(message))
socket.send_json({'response': 'Hello'})
Next the client.
import zmq
ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)
if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
socket.send_json(message, flags=zmq.NOBLOCK)
if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
response = socket.recv_json()
print(response)
socket.disconnect(addr)
socket.close(linger=0)
context.term()
Here I've tried to enhance the default client with the ability to timeout if the server is not available. The code below is using the poll method, although I've also tried with setting a receive timeout on the socket.
If the server is running, the client sends and receives a response and exits cleanly.
If the server is not running, the client passes immediately through the first socket.poll
call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll
call and correctly skips the recv_json block. It then hangs on the context.term()
call. My understanding, from searching is that this will hang if there are sockets that have not been closed, which doesn't seem to be the case.
Any help is much appreciated.
Upvotes: 1
Views: 1305
Reputation: 1
Timeout is possible, yet that will not allow the hard-wired REQ/REP
two-step dance to survive, the less to continue in a proper manner, if one side timeouts an otherwise mandatory step in a distributed Finite State Automaton scheme ( dFSA cannot take one-sided shortcuts, it is a dual-sided dFSA ).
If the server is not running, the client passes immediately through the first
socket.poll
call (sincezmq
just buffers the message internally). It blocks for 1 second on the secondsocket.poll
call and correctly skips therecv_json
block. It then hangs on thecontext.term()
call.
Let's review the code in a step-by-step manner
def Test( SetImmediate = False ):
##################################################################################
import zmq, json, time; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
##################################################################################
ip = 'localhost'; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
port = 5555; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
addr = "tcp://{0}:{1}".format( ip, port ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
message = { 'value': 10 }; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
##################################################################################
context = zmq.Context(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; aReqSock = context.socket( zmq.REQ ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
pass; aReqSock.setsockopt( zmq.LINGER, 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
if SetImmediate:
aReqSock.setsockopt( zmq.IMMEDIATE, 1 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; aReqSock.connect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
pass; aReqSock.send_json( message, flags = zmq.NOBLOCK ) # .send()-s dispatches message the REP-side may .recv() at some later time
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
response = aReqSock.recv_json() # .recv() BLOCKS until ... if ever ...
print( response ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
##################################################################################
rc = aReqSock.disconnect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = aReqSock.close( linger = 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = context.term(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
This produces something about this:
>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER: 4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... == 2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~ None | Success
and
>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER: 4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~ None | Success
Which proves the hypothesis not to be correct: there is no problem with the context.term()
, but with a way, how the .connect( aTransportClass_Target )
against a target, which is not present is being internally handled.
To my surprise, in the version under test ( v4.2.5 ) the .poll( zmq.POLLOUT )
reports to have 2
items in the .POLLOUT
-direction already present inside the user-reported TxQueue-state, without making a single explicit .send()
( as the .poll()
was launched right after a .connect()
).
This seems to me to be some inconsistency with previous versions ( as if it would try to report a .connect()
-associated "protocol/identity"-telemetry instead of reporting just the user-app-level messages ).
Whereas I might be wrong in trying to find out some rationale, why a principally empty queue would ever try to report a message being had already inside its .POLLOUT
-direction, I hope to have sufficiently proved, the problem has nothing to do with the .LINGER == 0
/ .term()
-ination of the Context()
-instance.
Q.E.D.
Upvotes: 3