Reputation: 46533
I'm using Python's asyncio module and async
/await
to process a character sequence in chunks concurrently and collect the results in a list. For that I'm using a chunker function (split
) and a chunk processing function (process_chunk
). They both come from a third-party library, and I would prefer not to change them.
Chunking is slow, and the number of chunks is not known up front, which is why I don't want to consume the whole chunk generator at once. Ideally, the code should advance the generator in sync with the process_chunk
's semaphore, i.e., every time that function returns.
My code
import asyncio
def split(sequence):
for x in sequence:
print('Getting the next chunk:', x)
yield x
print('Finished chunking')
async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
async with semaphore:
print('Processing chunk:', chunk)
await asyncio.sleep(3)
return 'OK'
async def process_in_chunks(sequence):
gen = split(sequence)
coro = [process_chunk(chunk) for chunk in gen]
results = await asyncio.gather(*coro)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(process_in_chunks('ABC'))
kind of works and prints
Getting the next chunk: A
Getting the next chunk: B
Getting the next chunk: C
Finished chunking
Processing chunk: C
Processing chunk: B
Processing chunk: A
although that means that the gen
generator is exhausted before the processing begins. I know why it happens, but how do change that?
Upvotes: 3
Views: 2109
Reputation: 13415
If you don't mind having an external dependency, you can use aiostream.stream.map:
from aiostream import stream, pipe
async def process_in_chunks(sequence):
# Asynchronous sequence of chunks
xs = stream.iterate(split(sequence))
# Asynchronous sequence of results
ys = xs | pipe.map(process_chunk, task_limit=2)
# Aggregation of the results into a list
zs = ys | pipe.list()
# Run the stream
results = await zs
print(results)
The chunks are generated lazily and fed to the process_chunk
coroutine. The amount of coroutines running concurrently is controlled by task_limit
. That means the semaphore in process_chunk
is no longer necessary.
Output:
Getting the next chunk: A
Processing chunk: A
Getting the next chunk: B
Processing chunk: B
# Pause 3 seconds
Getting the next chunk: C
Processing chunk: C
Finished chunking
# Pause 3 seconds
['OK', 'OK', 'OK']
See more examples in this demonstration and the documentation.
Upvotes: 5
Reputation: 39546
next
to iterate through gen
manually.
import asyncio
# third-party:
def split(sequence):
for x in sequence:
print('Getting the next chunk:', x)
yield x
print('Finished chunking')
async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
async with semaphore:
print('Processing chunk:', chunk)
await asyncio.sleep(3)
return 'OK'
# our code:
sem = asyncio.Semaphore(2) # let's use our semaphore
async def process_in_chunks(sequence):
tasks = []
gen = split(sequence)
while True:
await sem.acquire()
try:
chunk = next(gen)
except StopIteration:
break
else:
task = asyncio.ensure_future(process_chunk(chunk)) # task to run concurently
task.add_done_callback(lambda *_: sem.release()) # allow next chunks to be processed
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True) # await all pending task
results = [task.result() for task in tasks]
return results
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(process_in_chunks('ABCDE'))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Output:
Getting the next chunk: A
Getting the next chunk: B
Processing chunk: A
Processing chunk: B
Getting the next chunk: C
Getting the next chunk: D
Processing chunk: C
Processing chunk: D
Getting the next chunk: E
Finished chunking
Processing chunk: E
Upvotes: 4