Reputation: 6121
I'm trying to design object oriented interface to ZeroMQ.
I wan't to use zmq receiver in my processes but I don't want it to block the process with it's loop. So I'm trying to start loop in another thread.
class BaseZmqReceiver(BaseZmqNode):
__metaclass__ = ABCMeta
def __init__(self, host, port, hwm, bind, on_receive_callback):
super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
self.node.on_message_callback = on_receive_callback
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.on_message_received)
ZmqLoopRunner().start()
def on_message_received(self, message):
return self.node.on_message_callback(message)
def create_node(self):
return ReceivingNode(None, None)
class ZmqLoopRunner(Thread):
def __init__(self):
super(ZmqLoopRunner, self).__init__()
self.loop = IOLoop.instance()
def run(self):
self.loop.start()
def stop(self):
self.loop.stop()
But I don't know how to properly stop this thread, because the loop.start() method is blocking. How can I do that?
Upvotes: 0
Views: 211
Reputation: 13495
1) If it's about Tornado IOLoop (update: it is not), The Right Way is to use the nonblocking integration inside a single IOLoop.
2) To stop an IOLoop by hand, one calls IOLoop.instance().stop()
from the IOLoop thread:
IOLoop.instance().add_callback(IOLoop.instance().stop)
add_callback()
makes sure that stop() method is called inside the IOLoop event thread, and IOLoop is stopped cleanly.
In your code this would be:
class BaseZmqReceiver(BaseZmqNode):
__metaclass__ = ABCMeta
def __init__(self, host, port, hwm, bind, on_receive_callback):
super(BaseZmqReceiver, self).__init__(
host=host, port=port, bind=bind, hwm=hwm)
# ...
self.zmq_runner = ZmqLoopRunner()
# this will start a new thread.
self.zmq_runner.start()
def stop(self):
self.zmq_runner.stop()
class ZmqLoopRunner(Thread):
# ...
def stop(self):
"""
Call this from any thread, add_callback() will make sure
it runs in IOLoop thread.
"""
self.loop.add_callback(self.loop.stop) # Note the absence of parentheses
3) If you need the thread to exit on program shutdown, you can make it daemon. Even more wrong, as it won't cleanly shut down the IOLoop
.
class ZmqLoopRunner(Thread):
def __init__(self):
super(ZmqLoopRunner, self).__init__()
self.daemon = True
Upvotes: 1