Reputation: 2304
First time using the Aync Generators. I'm using Python 3.9.
This is my implementation:
import asyncio
class SubEventStream():
def __init__(self) -> None:
self.queue = asyncio.Queue()
return
async def __aiter__(self):
return self
async def __anext__(self):
return await self.pop()
async def append(self, request):
return await self.queue.put(request)
async def pop(self):
r = await self.queue.get()
self.queue.task_done()
return r
def create_append_tasks(ls, q):
return [
asyncio.create_task(q.append(i))
for i in ls
]
async def append_tasks(q):
tasks = create_append_tasks(('a', 'b', 'c', 'd', 'e'), q)
return await asyncio.gather(*tasks)
async def run():
q = SubEventStream()
await append_tasks(q)
async for v in q:
print(v)
asyncio.run(run())
Confusingly this is the result I keep getting:
/tmp/tmp.ie3Dj7Q9hn/test.py:37: RuntimeWarning: coroutine 'SubEventStream.__aiter__' was never awaited
async for v in q:
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
File "test.py", line 40, in <module>
asyncio.run(run())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "test.py", line 37, in run
async for v in q:
TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine
Clearly I do implement __anext__
. What's the hold up?
Upvotes: 3
Views: 4117
Reputation: 1
Working example:
import asyncio
class AsyncCounter():
def __init__(self, n):
self.current = 0
self.n = n
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.n:
raise StopAsyncIteration
await asyncio.sleep(1)
self.current += 1
return self.current
async def main():
async for i in AsyncCounter(4):
print(i)
asyncio.run(main())
Upvotes: -1
Reputation: 23144
Calling __aiter__
must return an asynchronous iterator object (which is self
in this case). When defined with async def
, calling it would return a coroutine resulting in an asynchronous iterator object only after using await
.
So in this case the simple fix is to change __aiter__
to a regular method without async
:
def __aiter__(self):
return self
Reference: https://docs.python.org/3.9/reference/datamodel.html#asynchronous-iterators, where also the typical usage of __aiter__
and __anext__
is shown.
Upvotes: 11
Reputation: 1770
To understand better here is the simple example of Async Generator
:
import asyncio
class AsyncCounter:
def __init__(self, n):
self.n = n
self.current = 0
async def __aiter__(self):
return self
async def __anext__(self):
if self.current < self.n:
await asyncio.sleep(1) # Simulate some async work
result = self.current
self.current += 1
return result
else:
raise StopAsyncIteration
async def main():
async for i in AsyncCounter(5):
print(i)
asyncio.run(main())
in above code, The async for statement calls the __anext__
method on the iterator object returned by __aiter__
, which waits for a second and returns the current value of self.current. The loop continues until the StopAsyncIteration exception is raised, which signals the end of the iteration.
What is
async for
?
async for
statement works similarly to the regularfor
statement, but it is used to iterate over asynchronous iterators instead of regular iterators, by repeatedly calls the__anext__
.
Upvotes: -1