Gary van der Merwe
Gary van der Merwe

Reputation: 9533

Builtin way to transform asynchronous iterable to synchronous iterable list

Python3.6 now asynchronous iterables. Is there builtin way to transform a asynchronous iterable to a synchronous iterable.

I currently have this helper function, but it feels very un-pythonic. Is there a better way to do this?

async def aiter_to_list(aiter):
    l = []
    async for i in aiter:
        l.append(i)
    return l

Upvotes: 12

Views: 8508

Answers (4)

supermodo
supermodo

Reputation: 803

From Python 3.6 you can use Asynchronous Comprehensions

async def async_iter():
    for i in range(0,5):
        yield i

# async comprehension
sync_list = [gen async for gen in async_iter()]

print(sync_list) # [0, 1, 2, 3, 4]

Upvotes: 22

Cristian Garcia
Cristian Garcia

Reputation: 9859

These functions allow you to convert from / to iterable <==> async iterable, not just simple lists.

Basic imports

import asyncio
import threading
import time

DONE = object()
TIMEOUT = 0.001

The function to_sync_iterable will convert any async iterable to a sync iterable:

def to_sync_iterable(async_iterable, maxsize = 0):

    def sync_iterable():

        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()

        t = threading.Thread(target=_run_coroutine, args=(loop, async_iterable, queue))
        t.daemon = True
        t.start()

        while True:
            if not queue.empty():
                x = queue.get_nowait()

                if x is DONE:
                    break
                else:
                    yield x
            else:
                time.sleep(utils.TIMEOUT)

        t.join()

    return sync_iterable()

def _run_coroutine(loop, async_iterable, queue):

    loop.run_until_complete(_consume_async_iterable(async_iterable, queue))

async def _consume_async_iterable(async_iterable, queue):

    async for x in async_iterable:
        await queue.put(x)

    await queue.put(DONE)

You can use it like this:

async def slow_async_generator():
    yield 0

    await asyncio.sleep(1)
    yield 1

    await asyncio.sleep(1)
    yield 2

    await asyncio.sleep(1)
    yield 3


for x in to_sync_iterable(slow_async_generator()):
    print(x)

The function to_async_iterable will convert any sync iterable to an async iterable:

def to_async_iterable(iterable, maxsize = 0):

    async def async_iterable():
        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()
        task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue))

        while True:
            x = await queue.get()

            if x is DONE:
                break
            else:
                yield x

        await task


    return async_iterable()

def _consume_iterable(loop, iterable, queue):

    for x in iterable:
        while True:
            if not queue.full():
                loop.call_soon_threadsafe(queue.put_nowait, x)
                break
            else:
                time.sleep(TIMEOUT)

    while True:
        if not queue.full():
            loop.call_soon_threadsafe(queue.put_nowait, DONE)
            break
        else:
            time.sleep(TIMEOUT)

This one is specially useful for asyncio programs because it won't block the event loop even if the the sync iterable blocks. You can use it like this:

def slow_sync_generator():
    yield 0

    time.sleep(1)
    yield 1

    time.sleep(1)
    yield 2

    time.sleep(1)
    yield 3

async def async_task():
    async for x in to_async_iterable(slow_sync_generator()):
        print(x)

asyncio.get_event_loop().run_until_complete(async_task())

Upvotes: 1

Vincent
Vincent

Reputation: 13415

You can use aiostream.stream.list:

from aiostream import stream

async def agen():
    yield 1
    yield 2
    yield 3

async def main():
    lst = await stream.list(agen())
    print(lst)  # prints [1, 2, 3]

More operators and examples in the documentation.

Upvotes: 5

deceze
deceze

Reputation: 522042

Your "asynchronous to synchronous" helper is itself asynchronous; not a big change at all. In general: no, you cannot make something asynchronous synchronous. An asynchronous value will be supplied "sometime later"; you cannot make that into "now" because the value doesn't exist "now" and you will have to wait for it, asynchronously.

Upvotes: 2

Related Questions