Kevin
Kevin

Reputation: 51

Use zmq.Poller() to add timeout for my REQ/REP zmqclient, but the function does not return anything

I want to add a timeout for my 0MQ client.

I tried zmq.Poller(). It seems to work at the beginning. But when I move code into a function, I find it doesn't return anything. It just stuck there.

I have two print lines.

First print:
I print the result zmq_Response successfully before this function returns. But when it comes to the next line, nothing returns.

Second print:
I guess that's why my last print does not work.

def send_message():
    context = zmq.Context()
    zmq_Socket = context.socket(zmq.REQ)
    zmq_Socket.connect('tcp://localhost:5000')
    zmq_Data = {'Register': 'default'}
    zmq_Socket.send_string(json.dumps(zmq_Data), flags=0, encoding='utf8')
    poller = zmq.Poller()
    poller.register(zmq_Socket, flags=zmq.POLLIN)
    if poller.poll(timeout=1000):
        zmq_Response = zmq_Socket.recv_json()
    else:
        # raise IOError("Timeout processing auth request")
        zmq_Response = {'test': 'test'}
    poller.unregister(zmq_Socket)
    print(zmq_Response) # **This print works!**
    return zmq_Response


res = send_message()
print(res)

It is expected to print zmq_Response but it does not.

Upvotes: 3

Views: 2754

Answers (2)

Nitin Ashutosh
Nitin Ashutosh

Reputation: 131

I have implemented zmq.poller() for timeout functionality with zmq.REQ and zmq.REP socket to handle deadlock. Have a look at the code.

For more explanation check out my repo

Client Code

#author: [email protected]
#client.py
import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)

socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.LINGER, 0)
# use poll for timeouts:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

#  Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s …" % request)
    socket.send(b"Hello")
    '''
    We have set response timeout of 6 sec.
    '''
    if poller.poll(6*1000):
        message = socket.recv()
        print("Received reply %s [ %s ]" % (request, message))
    else:
        print("Timeout processing auth request {}".format(request))
        print("Terminating socket for old request {}".format(request))
        socket.close()
        context.term()
        context = zmq.Context()
        socket = context.socket(zmq.REQ)
        socket.connect("tcp://localhost:5555")
        socket.setsockopt(zmq.LINGER, 0)
        poller.register(socket, zmq.POLLIN)
        print("socket has been re-registered for request {}".format(request+1))

Server Code

#server.py
import time
import zmq

context = zmq.Context()
#socket = context.socket(zmq.PULL)
socket = context.socket(zmq.REP)

socket.bind("tcp://*:5555")
#socket.RCVTIMEO = 6000
i = 0 
while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)
    #Get the reply.
    time.sleep(i)
    i+=1
    #  Send reply back to client
    socket.send(b"World")

Upvotes: 0

Kevin
Kevin

Reputation: 51

I solve it now...

It seems that when the value of zmq_LINGER is the default value, which is -1, context will wait until messages have been sent successfully before allowing termination.

So I set zmq_LINGER to 1 at timeout branch. It works for now.

def send_message():
    context = zmq.Context()
    zmq_Socket = context.socket(zmq.REQ)
    zmq_Socket.connect('tcp://localhost:5000')
    zmq_Data = {'Register': 'default'}
    zmq_Socket.send_string(json.dumps(zmq_Data), flags=0, encoding='utf8')
    poller = zmq.Poller()
    poller.register(zmq_Socket, flags=zmq.POLLIN)
    if poller.poll(timeout=1000):
        zmq_Response = zmq_Socket.recv_json()
    else:

        # --------------------------------------------
        # I change the value of zmq.LINGER here.
        zmq_Socket.setsockopt(zmq.LINGER, 1)
        # --------------------------------------------

        zmq_Response = {'test': 'test'}
    poller.unregister(zmq_Socket)
    print(zmq_Response)
    return zmq_Response


res = send_message()
print(res)

Upvotes: 2

Related Questions