frans
frans

Reputation: 9808

Abort zeromq recv() or poll() from another thread - instantly and without the need to wait for timeout

I'm using ZeroMQ in Python and C++ in many configurations and I wonder which is the most elegant way to abort a recv() or poll() from another thread (e.g. in case of controlled program termination but also if you want to stop listening without the need to kill the socket).

In contrast to this question I don't just want to avoid infinitive wait but I want to return immediately from recv() or poll().

I know I can just provide a timeout and abort recv() like this:

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while _running:
    if poller.poll(timeout=100) == []:
        # maybe handle unwanted timout here..
        continue

    handle_message(socket.recv())

This will poll the socket endlessly until _running is set to False from another thread - after a maximum of 100 ms I'm done.

But this is not nice - I have a busy loop and it's hard this way to handle real timeouts which might be result of unwanted behavior. Also I have to wait for the timeout which is not critical in most cases but.. you know what I mean.

Of course I can poll an extra socket for abortion:

abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)

while _running:
    poll_result = poller.poll(timeout=1000)
    if socket in poll_result:
        handle_message(socket.recv())
    elif abort_socket in poll_result:
        break
    else:
        # handle real timeout here
        pass

But this approach also has disadvantages:

So my question is: how is this done the nice way?

Can I somehow just use something like Python's threading.Event or s.th. similar in other languages rather than the abort-socket which can be passed to the poller like this?:

def listener_thread_fn(event)

    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)
    poller.register(event, zmq.POLLIN)

    while _running:
        poll_result = poller.poll(timeout=1000)
        if socket in poll_result:
            handle_message(socket.recv())
        elif event in poll_result:
            break
        else:
            # handle real timeout here
            pass

So you just had to create a theading.Event() in the first place, pass it to listener_thread_fn and call event.set() from any thread to abort.

Upvotes: 7

Views: 2288

Answers (2)

Roberto
Roberto

Reputation: 1097

This question is old but still relevant and without a real answer as far as I can tell.

My solution (though more of a workaround) is to indeed just close the socket from outside the thread to instantly abort a polller.poll(), socket.recv(), etc.
This will however raise an exception in the ZMQ thread: ZMQError("not a socket", errno=128), which I decided to just catch and let pass. This directly conflicts with ZMQ's warning to only use a socket from a single thread, but I simply cannot find another method to cancel a query instantly.

The code looks like:


class ZMQClient:

    # ...

    def thread_loop(self):
        # Called in a dedicated thread somewhere
        try:
            updated_sockets = dict(self._poller.poll(timeout=100))
        except ZMQError as err:
            if str(err) != "not a socket":
                raise err  # Don't show exception if context was killed
        else:
            if self.socket in updated_sockets:
                msg = self.socket.recv()
                # ...

    # ...

    def stop(self):
        self.socket.close()

I am combining this with a PySide6 QT app, where I stop the loop from a closeEvent():

class MyMainWindow(QMainWindow):

    def __init__():
        # Start ZMQ thread
        # self.client = ...
        # ...

    # ...

    def closeEvent(self, event):
        self.client.stop()

Upvotes: 0

Peque
Peque

Reputation: 14851

With Python and pyzmq, an error is raised on recv() or poll() interruption; so you can simply catch the exception when it occurs. An example with recv():

while True:
    try:
        request = server.recv()
    except zmq.ZMQError as e:
        if e.errno == errno.EINTR:
            print('Clean exit now!')
            break
        raise

You can easily modify that code to use poll() instead (its the same procedure). Also, note that you'll need to:

import errno

Upvotes: 1

Related Questions