Mercado
Mercado

Reputation: 606

multiprocessing.Process and asyncio loop communication

import asyncio
from multiprocessing import Queue, Process
import time

task_queue = Queue()

# This is simulating the task
async def do_task(task_number):
  for progress in range(task_number):
    print(f'{progress}/{task_number} doing')
    await asyncio.sleep(10)

# This is the loop that accepts and runs tasks
async def accept_tasks():
  event_loop = asyncio.get_event_loop()
  while True:
    task_number = task_queue.get() <-- this blocks event loop from running do_task()
    event_loop.create_task(do_task(task_number))

# This is the starting point of the process,
# the event loop runs here
def worker():
  event_loop = asyncio.get_event_loop()
  event_loop.run_until_complete(accept_tasks())

# Run a new process
Process(target=worker).start()

# Simulate adding tasks every 1 second
for _ in range(1,50):
  task_queue.put(_)
  print('added to queue', _)
  time.sleep(1)

I'm trying to run a separate process that runs an event loop to do I/O operations. Now, from a parent process, I'm trying to "queue-in" tasks. The problem is that do_task() does not run. The only solution that works is polling (i.e. checking if empty, then sleeping X seconds).

After some researching, the problem seems to be that task_queue.get() isn't doing event-loop-friendly IO.

aiopipe provides a solution, but assumes both processes are running in an event loop.

I tried creating this. But the consumer isn't consuming anything...

read_fd, write_fd = os.pipe()
consumer = AioPipeReader(read_fd)
producer = os.fdopen(write_fd, 'w')

Upvotes: 1

Views: 3806

Answers (2)

user4815162342
user4815162342

Reputation: 154836

A simple workaround for this situation is to change task_number = task_queue.get() to task_number = await event_loop.run_in_executor(None, task_queue.get). That way the blocking Queue.get() function will be off-loaded to a thread pool and the current coroutine suspended, as a good asyncio citizen. Likewise, once the thread pool finishes with the function, the coroutine will resume execution.

This approach is a workaround because it doesn't scale to a large number of concurrent tasks: each blocking call "turned async" that way will take a slot in the thread pool, and those that exceed the pool's maximum number of workers will not even start executing before a threed frees up. For example, rewriting all of asyncio to call blocking functions through run_in_executor would just result in a badly written threaded system. However, if you know that you have a small number of child processes, using run_in_executor is correct and can solve the problem very effectively.

Upvotes: 6

Mercado
Mercado

Reputation: 606

I finally figured it out. There is a known way to do this with aiopipe library. But it's made to run on two event loops on two different processes. In my case, I only have the child process running an event loop. To solve that, I changed the writing part into a unbuffered normal write using open(fd, buffering=0).

Here is the code without any library.

import asyncio
from asyncio import StreamReader, StreamReaderProtocol
from multiprocessing import Process
import time
import os

# This is simulating the task
async def do_task(task_number):
  for progress in range(task_number):
    print(f'{progress}/{task_number} doing')
    await asyncio.sleep(1)

# This is the loop that accepts and runs tasks
async def accept_tasks(read_fd):
  loop = asyncio.get_running_loop()
  # Setup asynchronous reading
  reader = StreamReader()
  protocol = StreamReaderProtocol(reader)
  transport, _ = await loop.connect_read_pipe(
    lambda: protocol, os.fdopen(read_fd, 'rb', 0))

  while True:
      task_number = int(await reader.readline())
      await asyncio.sleep(1)
      loop.create_task(do_task(task_number))

  transport.close()

# This is the starting point of the process,
# the event loop runs here
def worker(read_fd):
  loop = asyncio.get_event_loop()
  loop.run_until_complete(accept_tasks(read_fd))

# Create read and write pipe
read_fd, write_fd = os.pipe()

# allow inheritance to child
os.set_inheritable(read_fd, True)
Process(target=worker, args=(read_fd, )).start()
# detach from parent
os.close(read_fd)

writer = os.fdopen(write_fd, 'wb', 0)
# Simulate adding tasks every 1 second
for _ in range(1,50):
  writer.write((f'{_}\n').encode())
  print('added to queue', _)
  time.sleep(1)

Basically, we use asynchronous reading on the child process' end, and do non-buffered synchronous write on the parent process' end. To do the former, you need to connect the event loop as shown in accept_tasks coroutine.

Upvotes: 0

Related Questions