Anake
Anake

Reputation: 7669

How to asynchronously process an async generator

I have a generator that continuously submits new data. I want to schedule this work as soon as it comes in. So far however, I can't seem to find a way to use the async for without awaiting each loop. Below is a cut down example, including a dummy async_generator, the one I am using comes from an external library (aiokafka.AIOKafkaConsumer).

import asyncio


async def async_generator():
    """only an example, really a kafka consumer"""
    await asyncio.sleep(0.1)
    print("submitted record")
    yield 42
    await asyncio.sleep(0.1)
    print("submitted record")
    yield 43


async def process_record():
    await asyncio.sleep(1)
    print("record processed")


async def my_f():
    async for _ in async_generator():
        await process_record()
        print("record processing submitted")


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(my_f())

I want the output of this to be:

submitted_record
record processing submitted
submitted_record
record processing submitted
record processed
record processed

But instead I get the following as I have to await process_record()

submitted record
record processed
record processing submitted
submitted record
record processed
record processing submitted

I don't think I can use asyncio.gather as the async_generator will continuously produce data and so the gather after the async for wouldn't be reached.

Upvotes: 5

Views: 3883

Answers (1)

Anake
Anake

Reputation: 7669

I was mistaken how asyncio.create_task works, it starts the task but does not wait for it to complete. This means that you can start several tasks which can finish in the background, but if the generator completes the await gather statement at the end makes sure that all of the started tasks have completed before continuing.

Therefore my desired behaviour can be achieved by creating a list of tasks in the async for and calling gather after. Here is a working example:

import asyncio


async def async_generator():
    for x in range(10):
        await asyncio.sleep(0.5)
        print(f"{x} - submitted record")
        yield x


async def process_record(x):
    await asyncio.sleep(1)
    print(f"{x} - record processed")


async def my_f():
    tasks = []
    async for x in async_generator():
        tasks.append(asyncio.create_task(process_record(x)))
        print(f"{x} - record processing submitted")
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(my_f())

The first few lines of output are as follows. Note that the records are each processed after the next record is already submitted.

0 - submitted record
0 - record processing submitted
1 - submitted record
1 - record processing submitted
0 - record processed
2 - submitted record
2 - record processing submitted
1 - record processed

Upvotes: 6

Related Questions