Jamie Marshall
Jamie Marshall

Reputation: 2304

Async Generator says it doesn't implement __anext__, when it does

First time using the Aync Generators. I'm using Python 3.9.

This is my implementation:

import asyncio

class SubEventStream():
    def __init__(self) -> None:
        self.queue = asyncio.Queue()
        return

    async def __aiter__(self):
        return self

    async def __anext__(self):
        return await self.pop()

    async def append(self, request):
        return await self.queue.put(request)

    async def pop(self):
        r = await self.queue.get()
        self.queue.task_done()
        return r

def create_append_tasks(ls, q):
    return [
        asyncio.create_task(q.append(i))
        for i in ls
    ]

async def append_tasks(q):
    tasks = create_append_tasks(('a', 'b', 'c', 'd', 'e'), q)
    return await asyncio.gather(*tasks)


async def run():
    q = SubEventStream()
    await append_tasks(q)

    async for v in q:
        print(v)

asyncio.run(run())

Confusingly this is the result I keep getting:

/tmp/tmp.ie3Dj7Q9hn/test.py:37: RuntimeWarning: coroutine 'SubEventStream.__aiter__' was never awaited
  async for v in q:
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "test.py", line 40, in <module>
    asyncio.run(run())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "test.py", line 37, in run
    async for v in q:
TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine

Clearly I do implement __anext__. What's the hold up?

Upvotes: 3

Views: 4117

Answers (3)

Sphynkx
Sphynkx

Reputation: 1

Working example:

import asyncio

class AsyncCounter():
    def __init__(self, n):
        self.current = 0
        self.n = n

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.current += 1
        return self.current

async def main():
    async for i in AsyncCounter(4):
        print(i)

asyncio.run(main())

Upvotes: -1

mkrieger1
mkrieger1

Reputation: 23144

Calling __aiter__ must return an asynchronous iterator object (which is self in this case). When defined with async def, calling it would return a coroutine resulting in an asynchronous iterator object only after using await.

So in this case the simple fix is to change __aiter__ to a regular method without async:

def __aiter__(self):
    return self

Reference: https://docs.python.org/3.9/reference/datamodel.html#asynchronous-iterators, where also the typical usage of __aiter__ and __anext__ is shown.

Upvotes: 11

Deepanshu Mehta
Deepanshu Mehta

Reputation: 1770

To understand better here is the simple example of Async Generator:

import asyncio

class AsyncCounter:
    def __init__(self, n):
        self.n = n
        self.current = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current < self.n:
            await asyncio.sleep(1)  # Simulate some async work
            result = self.current
            self.current += 1
            return result
        else:
            raise StopAsyncIteration

async def main():
    async for i in AsyncCounter(5):
        print(i)

asyncio.run(main())

in above code, The async for statement calls the __anext__ method on the iterator object returned by __aiter__, which waits for a second and returns the current value of self.current. The loop continues until the StopAsyncIteration exception is raised, which signals the end of the iteration.

What is async for ?

async for statement works similarly to the regular for statement, but it is used to iterate over asynchronous iterators instead of regular iterators, by repeatedly calls the __anext__.

Upvotes: -1

Related Questions