huagang
huagang

Reputation: 199

Python: asynchronous generator is already running

As in the following example, I encountered an unusual error when using async Generator.

async def demo():
    async def get_data():
        for i in range(5):  # loop: for or while
            await asyncio.sleep(1)  # some IO code

            yield i

    datas = get_data()

    await asyncio.gather(
        anext(datas),
        anext(datas),
        anext(datas),
        anext(datas),
        anext(datas),
    )


if __name__ == '__main__':
    # asyncio.run(main())
    asyncio.run(demo())

Console output:

2022-05-11 23:55:24,530 DEBUG asyncio 29180 30600 Using proactor: IocpProactor
Traceback (most recent call last):
  File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 77, in <module>
    asyncio.run(demo())
  File "D:\devtools\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "D:\devtools\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 66, in demo
    await asyncio.gather(
RuntimeError: anext(): asynchronous generator is already running

Situation description: I have a loop logic that fetches a batch of data from Redis at a time, and I want to use yield to return the result. But this error occurs when I create a concurrent task.

Is there a good solution to this situation? I don't mean to change the way I'm using it now, but to see if I can tell if it's running or something like a lock and wait for it to run and then execute anext.

Maybe my logic is not reasonable now, but I also want to understand some critical language, let me realize the seriousness of this.

Thank you for your help.

Upvotes: 7

Views: 4369

Answers (1)

Anton Bryzgalov
Anton Bryzgalov

Reputation: 1462

TL;DR: the right way

Async generators suit badly for a parallel consumption. See my explanations below. As a proper workaround, use asyncio.Queue for the communication between producers and consumers:

queue = asyncio.Queue()

async def producer():
    for item in range(5):
        await asyncio.sleep(random.random())  # imitate async fetching
        print('item fetched:', item)
        await queue.put(item)

async def consumer():
    while True:
        item = await queue.get()
        await asyncio.sleep(random.random())  # imitate async processing
        print('item processed:', item)

await asyncio.gather(producer(), consumer(), consumer())

The above code snippet works well for an infinite stream of items: for example, a web server, which runs forever serving requests from clients. But what if we need to process a finite number of items? How should consumers know when to stop?

This deserves another question on Stack Overflow to cover all alternatives, but the simplest option is a sentinel approach, described below.

Sentinel: finite data streams approach

Introduce a sentinel = object(). When all items from an external data source are fetched and put to the queue, producer must push as many sentinels to the queue as many consumers you have. Once a consumer fetches the sentinel, it knows it should stop: if item is sentinel: break from loop.

sentinel = object()
consumers_count = 2

async def producer():
    ...  # the same code as above
    if new_item is None:  # if no new data
        for _ in range(consumers_count):
            await queue.put(sentinel)

async def consumer():
    while True:
        ...  # the same code as above
        if item is sentinel:
            break

await asyncio.gather(
    producer(),
    *(consumer() for _ in range(consumers_count)),
)

TL;DR [2]: a dirty workaround

Since you require to not change your async generator approach, here is an asyncgen-based alternative. To resolve this issue (in a simple-yet-dirty way), you may wrap the source async generator with a lock:

async def with_lock(agen, lock: asyncio.Lock):
    while True:
        async with lock:  # only one consumer is allowed to read
            try:
                item = await anext(agen)
            except StopAsyncIteration:
                break
        # exiting lock section => let others consume
        yield item  # consumer processes an item asyncly

lock = asyncio.Lock()  # a common lock for all consumers
await asyncio.gather(
    # every consumer must have its own "wrapped" generator
    anext(with_lock(datas, lock)),
    anext(with_lock(datas, lock)),
    ...
)

This will ensure only one consumer awaits for an item from the generator at a time. While this consumer awaits, other consumers are being executed, so parallelization is not lost.

Important! In the above approach do not yield await anext(agen) as a single expression under the lock: your wrapping generator will suspend (on yield) with lock unreleased and no other consumer will be able to consume another item in parallel. I.e. only wrap anext call with a lock, but do not yield in the lock section.

A roughly equivalent code with async for (looks a little smarter):

async def with_lock(agen, lock: asyncio.Lock):
    await lock.acquire()
    async for item in agen:
        lock.release()
        yield item
        await lock.acquire()
    lock.release()

However, this code only handles async generator's anext method. Whereas generators API also includes aclose and athrow methods. See an explanation below.

Though you may add support for those to the with_lock function too, I would recommend to either subclass a generator and handle the lock support inside, or better use the Queue-based approach from above.

See contextlib.aclosing for some inspiration.

Explanation

Both sync and async generators have a special attribute: .gi_running (for regular generators) and .ag_running (for async ones). You may discover them by executing dir on a generator:

>>> dir((i for i in range(0))
[..., 'gi_running', ...]

They are set to True when a generator's .__next__ or .__anext__ method is executed (next(...) and anext(...) are just a syntactic sugar for those).

This prevents re-executing next(...) on a generator, when another next(...) call on the same generator is already being executed: if the running flag is True, an exception is raised (for a sync generator it raises ValueError: generator already executing).

So, returning to your example, when you run await anext(datas) (via asyncio.gather), the following happens:

  1. datas.ag_running is set to True.
  2. An execution flow steps into the datas.__anext__ method.
  3. Once an inner await statement is reached inside of the __anext__ method (await asyncio.sleep(1) in your case), asyncio's loop switches to another consumer.
  4. Now, another consumer tries to call await anext(datas) too, but since datas.ag_running flag is still set to True, this results in a RuntimeError.

Why is this flag needed?

A generator's execution can be suspended and resumed. But only at yield statements. Thus, if a generator is paused at an inner await statement, it cannot be "resumed", because its state disallows it.

That's why a parallel next/anext call to a generator raises an exception: it is not ready to be resumed, it is already running.

athrow and aclose

Generators' API (both sync and async) includes not only send/asend method for iteration, but also:

  • close/aclose to release generator-allocated resources (e.g. a database connection) on exit or an exception
  • and throw/athrow to inform generator that it has to handle an exception.

aclose and athrow are async methods too. Which means that if two consumers try to close/throw an underlying generator in parallel, you will encounter the same issue since a generator will be closing (or handling an exception) while closed (thrown an exception) again.

Sync generators example

Though this is a frequent case for async generators, reproducing it for sync generators is not that naive, since sync next(...) calls are rarely interrupted.

One of the ways to interrupt a sync generator is to run a multithreaded code with multiple consumers (run in parallel threads) reading from a single generator. In that case, when the generator's code is interrupted while executing a next call, all other consumers' parallel attempts to call next will result in an exception.

Another way to achieve this is demonstrated in the generators-related PEP #255 via a self-consuming generator:

>>> def g():
...     i = next(me)
...     yield i
... 
>>> me = g()
>>> next(me)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in g
ValueError: generator already executing

When outer next(me) is called, it sets me.gi_running to True and then executes the generator function code. A subsequent inner next(me) call leads to a ValueError.

Conclusion

Generators (especially async) work the best when consumed by a single reader. Multiple consumers support is hard, since requires patching behaviour of all the generator's methods, and thus discouraged.

Upvotes: 16

Related Questions