MemoNick
MemoNick

Reputation: 511

Python - Combining multiprocessing and asyncio

I'm trying to combine multiprocessing with asyncio. The program has two main components - one which streams/generates content, and another that consumes it.

What I want to do is to create multiple processes in order to exploit multiple CPU cores - one for the stream listener/generator, another for the consumer, and a simple one to shut down everything when the consumer has stopped.

My approach so far has been to create the processes, and start them. Each such process creates an async task. Once all processes have started, I run the asyncio tasks. What I have so far (stripped down) is:

def consume_task(loop, consumer):
    loop.create_task(consume_queue(consumer))

def stream_task(loop, listener, consumer):
    loop.create_task(create_stream(listener, consumer))

def shutdown_task(loop, listener):
    loop.create_task(shutdown(consumer))

async def shutdown(consumer):
    print("Shutdown task created")
    while not consumer.is_stopped():
        print("No activity")
        await asyncio.sleep(5)
    print("Shutdown initiated")
    loop.stop()

async def create_stream(listener, consumer):
    stream = Stream(auth, listener)
    print("Stream created")
    stream.filter(track=KEYWORDS, is_async=True)
    await asyncio.sleep(EVENT_DURATION)
    print("Stream finished")
    consumer.stop()

async def consume_queue(consumer):
    await consumer.run()

loop = asyncio.get_event_loop()

p_stream = Process(target=stream_task, args=(loop, listener, consumer, ))
p_consumer = Process(target=consume_task, args=(loop, consumer, ))
p_shutdown = Process(target=shutdown_task, args=(loop, consumer, ))
p_stream.start()
p_consumer.start()
p_shutdown.start()

loop.run_forever()
loop.close()

The problem is that everything hangs (or does it block?) - no tasks are actually running. My solution was to change the first three functions to:

def consume_task(loop, consumer):
    loop.create_task(consume_queue(consumer))
    loop.run_forever()

def stream_task(loop, listener, consumer):
    loop.create_task(create_stream(listener, consumer))
    loop.run_forever()

def shutdown_task(loop, listener):
    loop.create_task(shutdown(consumer))
    loop.run_forever()

This does actually run. However, the consumer and the listener objects are not able to communicate. As a simple example, when the create_stream function calls consumer.stop(), the consumer does not stop. Even when I change a consumer class variable, the changes are not made - case in point, the shared queue remains empty. This is how I am creating the instances:

queue = Queue()
consumer = PrintConsumer(queue)
listener = QueuedListener(queue, max_time=EVENT_DURATION)

Please note that if I do not use processes, but only asyncio tasks, everything works as expected, so I do not think it's a reference issue:

loop = asyncio.get_event_loop()
stream_task(loop, listener, consumer)
consume_task(loop, consumer)
shutdown_task(loop, listener)
loop.run_forever()
loop.close()

Is it because they are running on different processes? How should I go about fixing this issue please?

Upvotes: 2

Views: 6758

Answers (1)

MemoNick
MemoNick

Reputation: 511

Found the problem! Multi-processing creates copies of instances. The solution is to create a Manager, which shares the instances itself.

EDIT [11/2/2020]:

import asyncio
from multiprocessing import Process, Manager

"""
These two functions will be created as separate processes.
"""
def task1(loop, shared_list):
    output = loop.run_until_complete(asyncio.gather(async1(shared_list)))

def task2(loop, shared_list):
    output = loop.run_until_complete(asyncio.gather(async2(shared_list)))

"""
These two functions will be called (in different processes) asynchronously.
"""
async def async1(shared_list):
    pass

async def async2(shared_list):
    pass

"""
Create the manager and start it up.
From this manager, also create a list that is shared by functions in different threads.
"""
manager = Manager()
manager.start()
shared_list = manager.list()

loop = asyncio.get_event_loop() # the event loop

"""
Create two processes.
"""
process1 = Process(target=task1, args=(loop, shared_list, ))
process2 = Process(target=task2, args=(loop, shared_list, ))

"""
Start the two processes and wait for them to finish.
"""
process1.start()
process2.start()

output1 = process1.join()
output2 = process2.join()

"""
Clean up
"""
loop.close()
manager.shutdown()

Upvotes: 2

Related Questions