user1685095
user1685095

Reputation: 6121

Stop python thread that contains blocking loop

I'm trying to design object oriented interface to ZeroMQ.

I wan't to use zmq receiver in my processes but I don't want it to block the process with it's loop. So I'm trying to start loop in another thread.

class BaseZmqReceiver(BaseZmqNode):
    __metaclass__ = ABCMeta

    def __init__(self, host, port, hwm, bind, on_receive_callback):
        super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
        self.node.on_message_callback = on_receive_callback
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.on_message_received)
        ZmqLoopRunner().start()

    def on_message_received(self, message):
        return self.node.on_message_callback(message)

    def create_node(self):
        return ReceivingNode(None, None)

class ZmqLoopRunner(Thread):

    def __init__(self):
        super(ZmqLoopRunner, self).__init__()
        self.loop = IOLoop.instance()

    def run(self):
        self.loop.start()

    def stop(self):
        self.loop.stop()

But I don't know how to properly stop this thread, because the loop.start() method is blocking. How can I do that?

Upvotes: 0

Views: 211

Answers (1)

Victor Sergienko
Victor Sergienko

Reputation: 13495

1) If it's about Tornado IOLoop (update: it is not), The Right Way is to use the nonblocking integration inside a single IOLoop.

2) To stop an IOLoop by hand, one calls IOLoop.instance().stop() from the IOLoop thread:

IOLoop.instance().add_callback(IOLoop.instance().stop)

add_callback() makes sure that stop() method is called inside the IOLoop event thread, and IOLoop is stopped cleanly.

In your code this would be:

class BaseZmqReceiver(BaseZmqNode):
    __metaclass__ = ABCMeta

    def __init__(self, host, port, hwm, bind, on_receive_callback):
        super(BaseZmqReceiver, self).__init__(
            host=host, port=port, bind=bind, hwm=hwm)
        # ...
        self.zmq_runner = ZmqLoopRunner()
        # this will start a new thread.
        self.zmq_runner.start()

    def stop(self):
        self.zmq_runner.stop()

class ZmqLoopRunner(Thread):
    # ...
    def stop(self):
        """ 
        Call this from any thread, add_callback() will make sure 
        it runs in IOLoop thread.
        """
        self.loop.add_callback(self.loop.stop)  # Note the absence of parentheses

3) If you need the thread to exit on program shutdown, you can make it daemon. Even more wrong, as it won't cleanly shut down the IOLoop.

class ZmqLoopRunner(Thread):
    def __init__(self):
        super(ZmqLoopRunner, self).__init__()
        self.daemon = True

Upvotes: 1

Related Questions