Reputation: 199
As in the following example, I encountered an unusual error when using async Generator.
async def demo():
async def get_data():
for i in range(5): # loop: for or while
await asyncio.sleep(1) # some IO code
yield i
datas = get_data()
await asyncio.gather(
anext(datas),
anext(datas),
anext(datas),
anext(datas),
anext(datas),
)
if __name__ == '__main__':
# asyncio.run(main())
asyncio.run(demo())
Console output:
2022-05-11 23:55:24,530 DEBUG asyncio 29180 30600 Using proactor: IocpProactor
Traceback (most recent call last):
File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 77, in <module>
asyncio.run(demo())
File "D:\devtools\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "D:\devtools\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
return future.result()
File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 66, in demo
await asyncio.gather(
RuntimeError: anext(): asynchronous generator is already running
Situation description: I have a loop logic that fetches a batch of data from Redis at a time, and I want to use yield to return the result. But this error occurs when I create a concurrent task.
Is there a good solution to this situation? I don't mean to change the way I'm using it now, but to see if I can tell if it's running or something like a lock and wait for it to run and then execute anext.
Maybe my logic is not reasonable now, but I also want to understand some critical language, let me realize the seriousness of this.
Thank you for your help.
Upvotes: 7
Views: 4369
Reputation: 1462
Async generators suit badly for a parallel consumption. See my explanations below. As a proper workaround, use asyncio.Queue
for the communication between producers and consumers:
queue = asyncio.Queue()
async def producer():
for item in range(5):
await asyncio.sleep(random.random()) # imitate async fetching
print('item fetched:', item)
await queue.put(item)
async def consumer():
while True:
item = await queue.get()
await asyncio.sleep(random.random()) # imitate async processing
print('item processed:', item)
await asyncio.gather(producer(), consumer(), consumer())
The above code snippet works well for an infinite stream of items: for example, a web server, which runs forever serving requests from clients. But what if we need to process a finite number of items? How should consumer
s know when to stop?
This deserves another question on Stack Overflow to cover all alternatives, but the simplest option is a sentinel
approach, described below.
Introduce a sentinel = object()
. When all items from an external data source are fetched and put to the queue, producer
must push as many sentinel
s to the queue as many consumer
s you have. Once a consumer
fetches the sentinel
, it knows it should stop: if item is sentinel: break
from loop.
sentinel = object()
consumers_count = 2
async def producer():
... # the same code as above
if new_item is None: # if no new data
for _ in range(consumers_count):
await queue.put(sentinel)
async def consumer():
while True:
... # the same code as above
if item is sentinel:
break
await asyncio.gather(
producer(),
*(consumer() for _ in range(consumers_count)),
)
Since you require to not change your async generator approach, here is an asyncgen-based alternative. To resolve this issue (in a simple-yet-dirty way), you may wrap the source async generator with a lock:
async def with_lock(agen, lock: asyncio.Lock):
while True:
async with lock: # only one consumer is allowed to read
try:
item = await anext(agen)
except StopAsyncIteration:
break
# exiting lock section => let others consume
yield item # consumer processes an item asyncly
lock = asyncio.Lock() # a common lock for all consumers
await asyncio.gather(
# every consumer must have its own "wrapped" generator
anext(with_lock(datas, lock)),
anext(with_lock(datas, lock)),
...
)
This will ensure only one consumer awaits for an item from the generator at a time. While this consumer awaits, other consumers are being executed, so parallelization is not lost.
Important! In the above approach do not yield await anext(agen)
as a single expression under the lock
: your wrapping generator will suspend (on yield
) with lock
unreleased and no other consumer will be able to consume another item in parallel. I.e. only wrap anext
call with a lock, but do not yield
in the lock section.
A roughly equivalent code with async for
(looks a little smarter):
async def with_lock(agen, lock: asyncio.Lock):
await lock.acquire()
async for item in agen:
lock.release()
yield item
await lock.acquire()
lock.release()
However, this code only handles async generator's anext
method. Whereas generators API also includes aclose
and athrow
methods. See an explanation below.
Though you may add support for those to the with_lock
function too, I would recommend to either subclass a generator and handle the lock support inside, or better use the Queue
-based approach from above.
See contextlib.aclosing
for some inspiration.
Both sync and async generators have a special attribute: .gi_running
(for regular generators) and .ag_running
(for async ones). You may discover them by executing dir
on a generator:
>>> dir((i for i in range(0))
[..., 'gi_running', ...]
They are set to True
when a generator's .__next__
or .__anext__
method is executed (next(...)
and anext(...)
are just a syntactic sugar for those).
This prevents re-executing next(...)
on a generator, when another next(...)
call on the same generator is already being executed: if the running flag is True
, an exception is raised (for a sync generator it raises ValueError: generator already executing
).
So, returning to your example, when you run await anext(datas)
(via asyncio.gather
), the following happens:
datas.ag_running
is set to True
.datas.__anext__
method.await
statement is reached inside of the __anext__
method (await asyncio.sleep(1)
in your case), asyncio
's loop switches to another consumer.await anext(datas)
too, but since datas.ag_running
flag is still set to True
, this results in a RuntimeError
.A generator's execution can be suspended and resumed. But only at yield
statements. Thus, if a generator is paused at an inner await
statement, it cannot be "resumed", because its state disallows it.
That's why a parallel next
/anext
call to a generator raises an exception: it is not ready to be resumed, it is already running.
athrow
and aclose
Generators' API (both sync and async) includes not only send
/asend
method for iteration, but also:
close
/aclose
to release generator-allocated resources (e.g. a database connection) on exit or an exceptionthrow
/athrow
to inform generator that it has to handle an exception.aclose
and athrow
are async methods too. Which means that if two consumers try to close/throw an underlying generator in parallel, you will encounter the same issue since a generator will be closing (or handling an exception) while closed (thrown an exception) again.
Though this is a frequent case for async generators, reproducing it for sync generators is not that naive, since sync next(...)
calls are rarely interrupted.
One of the ways to interrupt a sync generator is to run a multithreaded code with multiple consumers (run in parallel threads) reading from a single generator. In that case, when the generator's code is interrupted while executing a next
call, all other consumers' parallel attempts to call next
will result in an exception.
Another way to achieve this is demonstrated in the generators-related PEP #255 via a self-consuming generator:
>>> def g():
... i = next(me)
... yield i
...
>>> me = g()
>>> next(me)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in g
ValueError: generator already executing
When outer next(me)
is called, it sets me.gi_running
to True
and then executes the generator function code. A subsequent inner next(me)
call leads to a ValueError
.
Generators (especially async) work the best when consumed by a single reader. Multiple consumers support is hard, since requires patching behaviour of all the generator's methods, and thus discouraged.
Upvotes: 16