vaultah
vaultah

Reputation: 46533

"Lazy" version of asyncio.gather?

I'm using Python's asyncio module and async/await to process a character sequence in chunks concurrently and collect the results in a list. For that I'm using a chunker function (split) and a chunk processing function (process_chunk). They both come from a third-party library, and I would prefer not to change them.

Chunking is slow, and the number of chunks is not known up front, which is why I don't want to consume the whole chunk generator at once. Ideally, the code should advance the generator in sync with the process_chunk's semaphore, i.e., every time that function returns.

My code

import asyncio

def split(sequence):
    for x in sequence:
        print('Getting the next chunk:', x)
        yield x
    print('Finished chunking')

async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
    async with semaphore:
        print('Processing chunk:', chunk)
        await asyncio.sleep(3)
        return 'OK'

async def process_in_chunks(sequence):
    gen = split(sequence)
    coro = [process_chunk(chunk) for chunk in gen]
    results = await asyncio.gather(*coro)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(process_in_chunks('ABC'))

kind of works and prints

Getting the next chunk: A
Getting the next chunk: B
Getting the next chunk: C
Finished chunking
Processing chunk: C
Processing chunk: B
Processing chunk: A

although that means that the gen generator is exhausted before the processing begins. I know why it happens, but how do change that?

Upvotes: 3

Views: 2109

Answers (2)

Vincent
Vincent

Reputation: 13415

If you don't mind having an external dependency, you can use aiostream.stream.map:

from aiostream import stream, pipe

async def process_in_chunks(sequence):
    # Asynchronous sequence of chunks
    xs = stream.iterate(split(sequence))
    # Asynchronous sequence of results
    ys = xs | pipe.map(process_chunk, task_limit=2)
    # Aggregation of the results into a list
    zs = ys | pipe.list()
    # Run the stream
    results = await zs
    print(results)

The chunks are generated lazily and fed to the process_chunk coroutine. The amount of coroutines running concurrently is controlled by task_limit. That means the semaphore in process_chunk is no longer necessary.

Output:

Getting the next chunk: A
Processing chunk: A
Getting the next chunk: B
Processing chunk: B
# Pause 3 seconds
Getting the next chunk: C
Processing chunk: C
Finished chunking
# Pause 3 seconds
['OK', 'OK', 'OK']

See more examples in this demonstration and the documentation.

Upvotes: 5

Mikhail Gerasimov
Mikhail Gerasimov

Reputation: 39546

  • Use next to iterate through gen manually
  • Acquire semaphore before getting and processing chunk
  • Release semaphore after chuck been processed

.

import asyncio


# third-party:
def split(sequence):
    for x in sequence:
        print('Getting the next chunk:', x)
        yield x
    print('Finished chunking')


async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
    async with semaphore:
        print('Processing chunk:', chunk)
        await asyncio.sleep(3)
        return 'OK'


# our code:
sem = asyncio.Semaphore(2)  # let's use our semaphore


async def process_in_chunks(sequence):    
    tasks = []
    gen = split(sequence)
    while True:
        await sem.acquire()
        try:
            chunk = next(gen)
        except StopIteration:
            break
        else:
            task = asyncio.ensure_future(process_chunk(chunk))  # task to run concurently
            task.add_done_callback(lambda *_: sem.release())  # allow next chunks to be processed
            tasks.append(task)
    await asyncio.gather(*tasks, return_exceptions=True)  # await all pending task
    results = [task.result() for task in tasks]
    return results


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(process_in_chunks('ABCDE'))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

Output:

Getting the next chunk: A
Getting the next chunk: B
Processing chunk: A
Processing chunk: B
Getting the next chunk: C
Getting the next chunk: D
Processing chunk: C
Processing chunk: D
Getting the next chunk: E
Finished chunking
Processing chunk: E

Upvotes: 4

Related Questions