Reputation: 1062
Imagine we have an original API that returns a generator (it really is a mechanisms that brings pages/chunks of results from a server while the providing a simple generator to the user, and lets him iterate over these results one by one. For simplicity:
# Original sync generator
def get_results():
# fetch from server
yield 1
yield 2
# fetch next page
yield 3
yield 4
# ....
Now there is a need to implement an asyncio version of the API, however we need to keep the old API operational as well. This is where things get complicated, we kind of want to translate an async generator into sync one, but I can't find an elegant way to do that. The best I could make work so far is "fetch all result into a list first, then provide a fake sync generator on that list". Which kind of defeats the purpose:
# Async generator
async def get_results_async():
# await fetch from server
yield 1
yield 2
# await fetch next page
yield 3
yield 4
# ....
# Backward compatible sync generator
def get_results():
async def gather_all_results():
res = []
async for i in get_results_async():
res.append(i)
return res
res = asyncio.run(gather_all_results())
for i in res:
yield i
Is there a better, more elegant way to do that without fetching all the results before returning them?
Thanks
Upvotes: 6
Views: 3463
Reputation: 8699
The problem with the accepted answer is that it ceases to work. Using asyncio.run(...)
will create a new event loop. asyncio.run(anext(...))
will run anext
in a fresh event loop until anext
returns. If your async generator calls other coroutine functions (async def
), these calls will be placed in the event loop. But when anext(...)
returns, asyncio.run(...)
will return and hence destroy the event loop. That will leave the intermediate results hanging.
Take a look at the following minimal demonstrator code that uses the accepted answer's code:
import asyncio
from collections.abc import AsyncIterator, Iterator
from typing import TypeVar
T = TypeVar("T")
def async_iterable_to_sync_iterable(iterator: AsyncIterator[T]) -> Iterator[T]:
try:
while True:
result = asyncio.run(anext(iterator))
# result = asyncio.run(await_anext(async_iterable))
print(f"🟡 async_iterable_to_sync_iterable: {result=}")
yield result
except StopAsyncIteration as e:
print(f"🟡 async_iterable_to_sync_iterable: StopAsyncIteration")
pass
async def dummy_relay(iterator: AsyncIterator[T]) -> AsyncIterator[T]:
async for elem in iterator:
print(f"🟡 dummy_relay: Relaying {elem} …")
yield elem
print(f"🟡 dummy_relay: Done.")
async def generate_numbers() -> AsyncIterator[int]:
for i in range(3):
print(f"🟡 generate_numbers: Yielding {i} …")
yield i
print(f"🟡 generate_numbers: Done.")
print(list(async_iterable_to_sync_iterable(generate_numbers())))
print()
print(list(async_iterable_to_sync_iterable(dummy_relay(generate_numbers()))))
In the output we only have this:
🟡 generate_numbers: Yielding 0 …
🟡 async_iterable_to_sync_iterable: result=0
🟡 generate_numbers: Yielding 1 …
🟡 async_iterable_to_sync_iterable: result=1
🟡 generate_numbers: Yielding 2 …
🟡 async_iterable_to_sync_iterable: result=2
🟡 generate_numbers: Done.
🟡 async_iterable_to_sync_iterable: StopAsyncIteration
[0, 1, 2]
🟡 generate_numbers: Yielding 0 …
🟡 dummy_relay: Relaying 0 …
🟡 async_iterable_to_sync_iterable: result=0
🟡 dummy_relay: Done.
🟡 async_iterable_to_sync_iterable: StopAsyncIteration
[0]
As you can see, generate_numbers
never finishes.
In order to fix this, one needs to persist the event loop for a while. In Python ≥ 3.11, use with asyncio.Runner()
to keep the event loop active during the whole duration of async_iterable_to_sync_iterable
:
def async_iterable_to_sync_iterable(iterator: AsyncIterator[T]) -> Iterator[T]:
with asyncio.Runner() as runner:
try:
while True:
result = runner.run(anext(iterator))
# result = asyncio.run(await_anext(async_iterable))
print(f"🟡 async_iterable_to_sync_iterable: {result=}")
yield result
except StopAsyncIteration as e:
print(f"🟡 async_iterable_to_sync_iterable: StopAsyncIteration")
pass
And then the output becomes what we expect:
🟡 generate_numbers: Yielding 0 …
🟡 dummy_relay: Relaying 0 …
🟡 async_iterable_to_sync_iterable: result=0
🟡 generate_numbers: Yielding 1 …
🟡 dummy_relay: Relaying 1 …
🟡 async_iterable_to_sync_iterable: result=1
🟡 generate_numbers: Yielding 2 …
🟡 dummy_relay: Relaying 2 …
🟡 async_iterable_to_sync_iterable: result=2
🟡 generate_numbers: Done.
🟡 dummy_relay: Done.
🟡 async_iterable_to_sync_iterable: StopAsyncIteration
[0, 1, 2]
Upvotes: 3
Reputation: 627
The accepted answer has the disadvantage of only working outside of any async code, which means if you call any legacy sync methods from an async method it will raise.
Another approach with a different set of trade-offs is to do the work in a background thread. There is asgiref which provides an asgiref.sync.AsyncToSync
adapter that implements this behavior, but it's possible to implement a simplified version yourself. The following allows calling async methods from sync code. Caveats: sync calls calls will block all async tasks while the method runs; and code which is thread-sensitive will not work properly (because this adapter uses a background thread).
import asyncio
import queue
import threading
from typing import Coroutine, AsyncGenerator
def async_to_sync(coroutine: Coroutine):
"Run an async method from sync code."
q = queue.Queue(maxsize=1)
async def threadmain():
try:
q.put((True, await coroutine))
except BaseException as ex:
q.put((False, ex))
thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
thread.start()
success, result = q.get()
if success:
return result
else:
raise result
def aiter_to_iter(it: AsyncIterator[T]) -> Generator[T, None, None]:
"Convert an async iterator into a regular (sync) iterator."
q_in = threading_queue.SimpleQueue()
q_out = threading_queue.SimpleQueue()
async def threadmain():
try:
# Wait until the sync generator requests an item before continuing
while q_in.get():
q_out.put((True, await anext(it)))
except StopAsyncIteration:
q_out.put((False, None))
except BaseException as ex:
q_out.put((False, ex))
thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
thread.start()
try:
while True:
q_in.put(True)
cont, result = q_out.get()
if cont:
yield result
elif result is None:
break
else:
raise result
finally:
q_in.put(False)
Upvotes: 0
Reputation: 868
For the reason that asyncio is contagious, it's hard to write elegant code to integrate asyncio code into the old codes. For the scenario above, the flowing code is a little better, but I don't think it's elegant enough.
async def get_results_async():
# await fetch from server
yield 1
yield 2
# await fetch next page
yield 3
yield 4
# ....
# Backward compatible sync generator
def get_results():
gen = get_results_async()
while True:
try:
yield asyncio.run(gen.__anext__())
except StopAsyncIteration:
break
And you can re-use your event loop and not to create a new one.
async def get_results_async():
# await fetch from server
yield 1
yield 2
# await fetch next page
yield 3
yield 4
# ....
# loop that you save in somewhere.
loop = asyncio.get_event_loop()
# Backward compatible sync generator
def get_results():
gen = get_results_async()
while True:
try:
yield loop.run_until_complete(gen.__anext__())
except StopAsyncIteration:
break
Upvotes: 7