Schwingkopf
Schwingkopf

Reputation: 133

Pyzmq's eventloop.future context with 'inproc' in worker threads

Part of my application consists of a main thread and a couple of worker threads that send their results to the main thread using pyzmq sockets. The main thread is running a tornado IOloop and uses async functions to read the incoming data on a variety of socket types created using a future.context.

For performance reasons I would like to use the inproc protocol. However inproc only works when the main thread and the worker threads share the identical context. This on the other hand would require that each worker thread needs to run a tornado IOloop, which I think is bit a overkill for simple workers.

A minimal example to illustrate the problem:

import time
from threading import Thread

import zmq
from zmq.eventloop.future import Context as FutureContext
import tornado.ioloop

def worker(ctx):
    socket = ctx.socket(zmq.PUSH)
    socket.bind('inproc://worker')

    while True:
        # Do some work
        time.sleep(2)
        socket.send_pyobj("Work done")

async def mainLoop(ctx):
    socket = ctx.socket(zmq.PULL)
    socket.connect('inproc://worker')

    while True:
        #print(socket.recv_pyobj())
        print(await socket.recv_pyobj()) 


normalCtx = zmq.Context()
futureCtx = FutureContext()

t = Thread(target=worker, kwargs=dict(ctx=normalCtx))
t.start()

# wait for bind to be effective
time.sleep(4)

io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(mainLoop, ctx=futureCtx)
io_loop.start()

In the example, the PULL socket won't receive messages since it is issued on a different context than the worker socket. The example works fine if I use the normalCtx in both threads (removing await). It also works fine when using TCP as the transport protocol.

The solutions to get it working I found are:

  1. Use normal context and give up the async/await.
  2. Use future context and run an ioloop in each worker.
  3. Use TCP as the transport protocol.

My question is if there is a magic trick to get it working with async/await, inproc and not having to run ioloops in the workers, e.g. by accessing the future context in a non-future way?

Upvotes: 2

Views: 111

Answers (0)

Related Questions