Dmitry Fink
Dmitry Fink

Reputation: 1062

Translating async generator into sync one

Imagine we have an original API that returns a generator (it really is a mechanisms that brings pages/chunks of results from a server while the providing a simple generator to the user, and lets him iterate over these results one by one. For simplicity:

# Original sync generator
def get_results():
     # fetch from server
     yield 1
     yield 2
     # fetch next page
     yield 3
     yield 4
     # ....

Now there is a need to implement an asyncio version of the API, however we need to keep the old API operational as well. This is where things get complicated, we kind of want to translate an async generator into sync one, but I can't find an elegant way to do that. The best I could make work so far is "fetch all result into a list first, then provide a fake sync generator on that list". Which kind of defeats the purpose:

# Async generator
async def get_results_async():
     # await fetch from server
     yield 1
     yield 2
     # await fetch next page
     yield 3
     yield 4
     # ....


# Backward compatible sync generator
def get_results():

    async def gather_all_results():
        res = []
        async for i in get_results_async():
            res.append(i)
        return res

    res = asyncio.run(gather_all_results())
    for i in res:
        yield i

Is there a better, more elegant way to do that without fetching all the results before returning them?

Thanks

Upvotes: 6

Views: 3463

Answers (3)

Martin Ueding
Martin Ueding

Reputation: 8699

The problem with the accepted answer is that it ceases to work. Using asyncio.run(...) will create a new event loop. asyncio.run(anext(...)) will run anext in a fresh event loop until anext returns. If your async generator calls other coroutine functions (async def), these calls will be placed in the event loop. But when anext(...) returns, asyncio.run(...) will return and hence destroy the event loop. That will leave the intermediate results hanging.

Take a look at the following minimal demonstrator code that uses the accepted answer's code:

import asyncio
from collections.abc import AsyncIterator, Iterator
from typing import TypeVar


T = TypeVar("T")


def async_iterable_to_sync_iterable(iterator: AsyncIterator[T]) -> Iterator[T]:
    try:
        while True:
            result = asyncio.run(anext(iterator))
            # result = asyncio.run(await_anext(async_iterable))
            print(f"🟡 async_iterable_to_sync_iterable: {result=}")
            yield result
    except StopAsyncIteration as e:
        print(f"🟡 async_iterable_to_sync_iterable: StopAsyncIteration")
        pass


async def dummy_relay(iterator: AsyncIterator[T]) -> AsyncIterator[T]:
    async for elem in iterator:
        print(f"🟡 dummy_relay: Relaying {elem} …")
        yield elem
    print(f"🟡 dummy_relay: Done.")


async def generate_numbers() -> AsyncIterator[int]:
    for i in range(3):
        print(f"🟡 generate_numbers: Yielding {i} …")
        yield i
    print(f"🟡 generate_numbers: Done.")


print(list(async_iterable_to_sync_iterable(generate_numbers())))
print()
print(list(async_iterable_to_sync_iterable(dummy_relay(generate_numbers()))))

In the output we only have this:

🟡 generate_numbers: Yielding 0 …
🟡 async_iterable_to_sync_iterable: result=0
🟡 generate_numbers: Yielding 1 …
🟡 async_iterable_to_sync_iterable: result=1
🟡 generate_numbers: Yielding 2 …
🟡 async_iterable_to_sync_iterable: result=2
🟡 generate_numbers: Done.
🟡 async_iterable_to_sync_iterable: StopAsyncIteration
[0, 1, 2]

🟡 generate_numbers: Yielding 0 …
🟡 dummy_relay: Relaying 0 …
🟡 async_iterable_to_sync_iterable: result=0
🟡 dummy_relay: Done.
🟡 async_iterable_to_sync_iterable: StopAsyncIteration
[0]

As you can see, generate_numbers never finishes.

In order to fix this, one needs to persist the event loop for a while. In Python ≥ 3.11, use with asyncio.Runner() to keep the event loop active during the whole duration of async_iterable_to_sync_iterable:

def async_iterable_to_sync_iterable(iterator: AsyncIterator[T]) -> Iterator[T]:
    with asyncio.Runner() as runner:
        try:
            while True:
                result = runner.run(anext(iterator))
                # result = asyncio.run(await_anext(async_iterable))
                print(f"🟡 async_iterable_to_sync_iterable: {result=}")
                yield result
        except StopAsyncIteration as e:
            print(f"🟡 async_iterable_to_sync_iterable: StopAsyncIteration")
            pass

And then the output becomes what we expect:

🟡 generate_numbers: Yielding 0 …
🟡 dummy_relay: Relaying 0 …
🟡 async_iterable_to_sync_iterable: result=0
🟡 generate_numbers: Yielding 1 …
🟡 dummy_relay: Relaying 1 …
🟡 async_iterable_to_sync_iterable: result=1
🟡 generate_numbers: Yielding 2 …
🟡 dummy_relay: Relaying 2 …
🟡 async_iterable_to_sync_iterable: result=2
🟡 generate_numbers: Done.
🟡 dummy_relay: Done.
🟡 async_iterable_to_sync_iterable: StopAsyncIteration
[0, 1, 2]

Upvotes: 3

Ryan Patterson
Ryan Patterson

Reputation: 627

The accepted answer has the disadvantage of only working outside of any async code, which means if you call any legacy sync methods from an async method it will raise.

Another approach with a different set of trade-offs is to do the work in a background thread. There is asgiref which provides an asgiref.sync.AsyncToSync adapter that implements this behavior, but it's possible to implement a simplified version yourself. The following allows calling async methods from sync code. Caveats: sync calls calls will block all async tasks while the method runs; and code which is thread-sensitive will not work properly (because this adapter uses a background thread).

import asyncio
import queue
import threading
from typing import Coroutine, AsyncGenerator

def async_to_sync(coroutine: Coroutine):
    "Run an async method from sync code."
    q = queue.Queue(maxsize=1)

    async def threadmain():
        try:
            q.put((True, await coroutine))
        except BaseException as ex:
            q.put((False, ex))

    thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
    thread.start()
    success, result = q.get()
    if success:
        return result
    else:
        raise result


def aiter_to_iter(it: AsyncIterator[T]) -> Generator[T, None, None]:
    "Convert an async iterator into a regular (sync) iterator."
    q_in = threading_queue.SimpleQueue()
    q_out = threading_queue.SimpleQueue()

    async def threadmain():
        try:
            # Wait until the sync generator requests an item before continuing
            while q_in.get():
                q_out.put((True, await anext(it)))
        except StopAsyncIteration:
            q_out.put((False, None))
        except BaseException as ex:
            q_out.put((False, ex))

    thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
    thread.start()

    try:
        while True:
            q_in.put(True)
            cont, result = q_out.get()
            if cont:
                yield result
            elif result is None:
                break
            else:
                raise result
    finally:
        q_in.put(False)

Upvotes: 0

Keijack
Keijack

Reputation: 868

For the reason that asyncio is contagious, it's hard to write elegant code to integrate asyncio code into the old codes. For the scenario above, the flowing code is a little better, but I don't think it's elegant enough.

async def get_results_async():
    # await fetch from server
    yield 1
    yield 2
    # await fetch next page
    yield 3
    yield 4
    # ....


# Backward compatible sync generator
def get_results():
    gen = get_results_async()
    while True:
        try:
            yield asyncio.run(gen.__anext__())
        except StopAsyncIteration:
            break 

And you can re-use your event loop and not to create a new one.

async def get_results_async():
    # await fetch from server
    yield 1
    yield 2
    # await fetch next page
    yield 3
    yield 4
    # ....

# loop that you save in somewhere.
loop = asyncio.get_event_loop()

# Backward compatible sync generator
def get_results():
    gen = get_results_async()
    while True:
        try:
            yield loop.run_until_complete(gen.__anext__())
        except StopAsyncIteration:
            break 

Upvotes: 7

Related Questions