Reputation: 133
Part of my application consists of a main thread and a couple of worker threads that send their results to the main thread using pyzmq
sockets. The main thread is running a tornado IOloop
and uses async
functions to read the incoming data on a variety of socket types created using a future.context
.
For performance reasons I would like to use the inproc
protocol. However inproc
only works when the main thread and the worker threads share the identical context. This on the other hand would require that each worker thread needs to run a tornado IOloop
, which I think is bit a overkill for simple workers.
A minimal example to illustrate the problem:
import time
from threading import Thread
import zmq
from zmq.eventloop.future import Context as FutureContext
import tornado.ioloop
def worker(ctx):
socket = ctx.socket(zmq.PUSH)
socket.bind('inproc://worker')
while True:
# Do some work
time.sleep(2)
socket.send_pyobj("Work done")
async def mainLoop(ctx):
socket = ctx.socket(zmq.PULL)
socket.connect('inproc://worker')
while True:
#print(socket.recv_pyobj())
print(await socket.recv_pyobj())
normalCtx = zmq.Context()
futureCtx = FutureContext()
t = Thread(target=worker, kwargs=dict(ctx=normalCtx))
t.start()
# wait for bind to be effective
time.sleep(4)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(mainLoop, ctx=futureCtx)
io_loop.start()
In the example, the PULL
socket won't receive messages since it is issued on a different context than the worker socket. The example works fine if I use the normalCtx
in both threads (removing await
). It also works fine when using TCP as the transport protocol.
The solutions to get it working I found are:
normal
context and give up the async/await
.future
context and run an ioloop
in each worker.My question is if there is a magic trick to get it working with async/await
, inproc
and not having to run ioloops
in the workers, e.g. by accessing the future
context in a non-future
way?
Upvotes: 2
Views: 111