DeusAduro
DeusAduro

Reputation: 6076

PyZMQ req socket - hang on context.term()

Struggling to properly shut down a simple pyzmq based client when the server is not available. Below are 2 snippets.

First the server. This is more or less the pyzmq example. No special code here:

import zmq
import json

port = 5555

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))

while True:
    message = socket.recv_json()
    print(json.dumps(message))
    socket.send_json({'response': 'Hello'})

Next the client.

import zmq

ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)

if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
    socket.send_json(message, flags=zmq.NOBLOCK)        
    if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
        response = socket.recv_json()
        print(response)

socket.disconnect(addr)
socket.close(linger=0)
context.term()

Here I've tried to enhance the default client with the ability to timeout if the server is not available. The code below is using the poll method, although I've also tried with setting a receive timeout on the socket.

If the server is running, the client sends and receives a response and exits cleanly.

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call. My understanding, from searching is that this will hang if there are sockets that have not been closed, which doesn't seem to be the case.

Any help is much appreciated.

Upvotes: 1

Views: 1305

Answers (1)

user3666197
user3666197

Reputation: 1

On "ability to timeout if the server is not available"

Timeout is possible, yet that will not allow the hard-wired REQ/REP two-step dance to survive, the less to continue in a proper manner, if one side timeouts an otherwise mandatory step in a distributed Finite State Automaton scheme ( dFSA cannot take one-sided shortcuts, it is a dual-sided dFSA ).


Hypothesis:

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call.

Validation:

Let's review the code in a step-by-step manner

def  Test( SetImmediate = False ):
     ##################################################################################
     import zmq, json, time;                                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
     ##################################################################################
     ip      = 'localhost';                                                       print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
     port    =  5555;                                                             print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
     addr    = "tcp://{0}:{1}".format( ip, port );                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
     message = { 'value': 10 };                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
     ##################################################################################
     context = zmq.Context();                                                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;              aReqSock = context.socket( zmq.REQ );                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ),                        "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     pass;              aReqSock.setsockopt(       zmq.LINGER,    0 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER    ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     if SetImmediate:
                        aReqSock.setsockopt(       zmq.IMMEDIATE, 1 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;              aReqSock.connect( addr );                                 print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ),                      "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################
     pass;        rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ),   "|", zmq.strerror( zmq.zmq_errno() ) )
     if      0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... ==  " ), rc )
         pass;          aReqSock.send_json( message,   flags = zmq.NOBLOCK )      # .send()-s dispatches message the REP-side may .recv() at some later time
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ),            "|", zmq.strerror( zmq.zmq_errno() ) )
         pass;    rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ),    "|", zmq.strerror( zmq.zmq_errno() ) )
         if  0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
             pass;                                                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
             response = aReqSock.recv_json()                                      # .recv() BLOCKS until ... if ever ...
             print( response );                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;                                                                        print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
     ##################################################################################
     rc = aReqSock.disconnect( addr );                                            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = aReqSock.close(      linger = 0 );                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = context.term();                                                         print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################

This produces something about this:

>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER:  4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... ==   2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~  None | Success

and

>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER:  4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~  None | Success

Which proves the hypothesis not to be correct: there is no problem with the context.term(), but with a way, how the .connect( aTransportClass_Target ) against a target, which is not present is being internally handled.

To my surprise, in the version under test ( v4.2.5 ) the .poll( zmq.POLLOUT ) reports to have 2 items in the .POLLOUT-direction already present inside the user-reported TxQueue-state, without making a single explicit .send() ( as the .poll() was launched right after a .connect() ).

This seems to me to be some inconsistency with previous versions ( as if it would try to report a .connect()-associated "protocol/identity"-telemetry instead of reporting just the user-app-level messages ).

Whereas I might be wrong in trying to find out some rationale, why a principally empty queue would ever try to report a message being had already inside its .POLLOUT-direction, I hope to have sufficiently proved, the problem has nothing to do with the .LINGER == 0 / .term()-ination of the Context()-instance.

Q.E.D.

Upvotes: 3

Related Questions