Reputation: 51
I want to add a timeout for my 0MQ client.
I tried zmq.Poller()
. It seems to work at the beginning. But when I move code into a function, I find it doesn't return anything. It just stuck there.
I have two print lines.
First print:
I print the result zmq_Response
successfully before this function returns. But when it comes to the next line, nothing returns.
Second print:
I guess that's why my last print does not work.
def send_message():
context = zmq.Context()
zmq_Socket = context.socket(zmq.REQ)
zmq_Socket.connect('tcp://localhost:5000')
zmq_Data = {'Register': 'default'}
zmq_Socket.send_string(json.dumps(zmq_Data), flags=0, encoding='utf8')
poller = zmq.Poller()
poller.register(zmq_Socket, flags=zmq.POLLIN)
if poller.poll(timeout=1000):
zmq_Response = zmq_Socket.recv_json()
else:
# raise IOError("Timeout processing auth request")
zmq_Response = {'test': 'test'}
poller.unregister(zmq_Socket)
print(zmq_Response) # **This print works!**
return zmq_Response
res = send_message()
print(res)
It is expected to print zmq_Response
but it does not.
Upvotes: 3
Views: 2754
Reputation: 131
I have implemented zmq.poller()
for timeout functionality with zmq.REQ
and zmq.REP
socket to handle deadlock. Have a look at the code.
For more explanation check out my repo
#author: [email protected]
#client.py
import zmq
context = zmq.Context()
# Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.LINGER, 0)
# use poll for timeouts:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
# Do 10 requests, waiting each time for a response
for request in range(10):
print("Sending request %s …" % request)
socket.send(b"Hello")
'''
We have set response timeout of 6 sec.
'''
if poller.poll(6*1000):
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
else:
print("Timeout processing auth request {}".format(request))
print("Terminating socket for old request {}".format(request))
socket.close()
context.term()
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.LINGER, 0)
poller.register(socket, zmq.POLLIN)
print("socket has been re-registered for request {}".format(request+1))
#server.py
import time
import zmq
context = zmq.Context()
#socket = context.socket(zmq.PULL)
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
#socket.RCVTIMEO = 6000
i = 0
while True:
# Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)
#Get the reply.
time.sleep(i)
i+=1
# Send reply back to client
socket.send(b"World")
Upvotes: 0
Reputation: 51
I solve it now...
It seems that when the value of zmq_LINGER is the default value, which is -1, context will wait until messages have been sent successfully before allowing termination.
So I set zmq_LINGER to 1 at timeout branch. It works for now.
def send_message():
context = zmq.Context()
zmq_Socket = context.socket(zmq.REQ)
zmq_Socket.connect('tcp://localhost:5000')
zmq_Data = {'Register': 'default'}
zmq_Socket.send_string(json.dumps(zmq_Data), flags=0, encoding='utf8')
poller = zmq.Poller()
poller.register(zmq_Socket, flags=zmq.POLLIN)
if poller.poll(timeout=1000):
zmq_Response = zmq_Socket.recv_json()
else:
# --------------------------------------------
# I change the value of zmq.LINGER here.
zmq_Socket.setsockopt(zmq.LINGER, 1)
# --------------------------------------------
zmq_Response = {'test': 'test'}
poller.unregister(zmq_Socket)
print(zmq_Response)
return zmq_Response
res = send_message()
print(res)
Upvotes: 2