Reputation: 9808
I'm using ZeroMQ in Python and C++ in many configurations and I wonder which is the most elegant way to abort a recv()
or poll()
from another thread (e.g. in case of controlled program termination but also if you want to stop listening without the need to kill the socket).
In contrast to this question I don't just want to avoid infinitive wait but I want to return immediately from recv()
or poll()
.
I know I can just provide a timeout
and abort recv()
like this:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while _running:
if poller.poll(timeout=100) == []:
# maybe handle unwanted timout here..
continue
handle_message(socket.recv())
This will poll the socket endlessly until _running
is set to False
from another thread - after a maximum of 100 ms I'm done.
But this is not nice - I have a busy loop and it's hard this way to handle real timeouts which might be result of unwanted behavior. Also I have to wait for the timeout which is not critical in most cases but.. you know what I mean.
Of course I can poll an extra socket for abortion:
abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)
while _running:
poll_result = poller.poll(timeout=1000)
if socket in poll_result:
handle_message(socket.recv())
elif abort_socket in poll_result:
break
else:
# handle real timeout here
pass
But this approach also has disadvantages:
abort_socket
can only be used from one thread, so I would have to make this sureSo my question is: how is this done the nice way?
Can I somehow just use something like Python's threading.Event
or s.th. similar in other languages rather than the abort-socket which can be passed to the poller like this?:
def listener_thread_fn(event)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(event, zmq.POLLIN)
while _running:
poll_result = poller.poll(timeout=1000)
if socket in poll_result:
handle_message(socket.recv())
elif event in poll_result:
break
else:
# handle real timeout here
pass
So you just had to create a theading.Event()
in the first place, pass it to listener_thread_fn
and call event.set()
from any thread to abort.
Upvotes: 7
Views: 2288
Reputation: 1097
This question is old but still relevant and without a real answer as far as I can tell.
My solution (though more of a workaround) is to indeed just close the socket from outside the thread to instantly abort a polller.poll()
, socket.recv()
, etc.
This will however raise an exception in the ZMQ thread: ZMQError("not a socket", errno=128)
, which I decided to just catch and let pass. This directly conflicts with ZMQ's warning to only use a socket from a single thread, but I simply cannot find another method to cancel a query instantly.
The code looks like:
class ZMQClient:
# ...
def thread_loop(self):
# Called in a dedicated thread somewhere
try:
updated_sockets = dict(self._poller.poll(timeout=100))
except ZMQError as err:
if str(err) != "not a socket":
raise err # Don't show exception if context was killed
else:
if self.socket in updated_sockets:
msg = self.socket.recv()
# ...
# ...
def stop(self):
self.socket.close()
I am combining this with a PySide6 QT app, where I stop the loop from a closeEvent()
:
class MyMainWindow(QMainWindow):
def __init__():
# Start ZMQ thread
# self.client = ...
# ...
# ...
def closeEvent(self, event):
self.client.stop()
Upvotes: 0
Reputation: 14851
With Python and pyzmq
, an error is raised on recv()
or poll()
interruption; so you can simply catch the exception when it occurs. An example with recv()
:
while True:
try:
request = server.recv()
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
print('Clean exit now!')
break
raise
You can easily modify that code to use poll()
instead (its the same procedure). Also, note that you'll need to:
import errno
Upvotes: 1