user48956
user48956

Reputation: 15788

How can I use asyncio.Condition within a Task in Python < v3.10

Any reason why I can't use a asyncio.Condition within a Task?

c = asyncio.Condition()

async def a():
    print("A ..")
    # await asyncio.sleep(0.2) # This works
    async with c:
        # RuntimeError: Task <Task pending coro=<a() running at ..this file..:13>> got Future <Future pending> attached to a different loop
        await c.wait()   #

async def main():
    asyncio.get_event_loop().create_task(a())
    await asyncio.sleep(2)

Says: "Got Future attached to a different loop"

I don't think I created a new loop.

Full example here:

import asyncio

c = asyncio.Condition()

async def a():
    print("A ..")
    # await asyncio.sleep(0.2) # This works
    async with c:
        # RuntimeError: Task <Task pending coro=<a() running at ..this file..:13>> got Future <Future pending> attached to a different loop
        await c.wait()   #
    print("A done")

async def b():
    await asyncio.sleep(2)
    print("B ..")
    async with c:
       c.notify_all() 
    print("B done")
    await asyncio.sleep(1)

async def main():
    asyncio.get_event_loop().create_task(a())
    await b()

asyncio.run(main())

I see the same error using, Python 3.7. 3.8, 3.9.

Upvotes: 0

Views: 789

Answers (2)

thisisalsomypassword
thisisalsomypassword

Reputation: 1611

The docs for asyncio.Condition.notify_all() states:

The lock must be acquired before this method is called and released shortly after. If called with an unlocked lock a RuntimeError error is raised.

The lock gets released in a on calling c.wait(), therefore the Lock inside c is unlocked when you call c.notify_all().

You need to hold the lock before calling notify_all(). Using

 async with c:
     c.notify_all()

makes your example work as expected.

UPDATE

I tested this only on Python 3.10.1 where it worked like this. In fact it fails when I run it on Python 3.8.5. But the problem here stems of the use of the Condition as a global variable. In your example the Condition is created before the event loop is created, so I assume it is not correctly attached to the event loop that is created later. I updated your example in a way, that the Condition is created with a running loop. This makes the example work again in Python 3.8.5:

import asyncio

async def a(c):
    print("A ..")
    async with c:
        await c.wait()
    print("A done")

async def b(c):
    await asyncio.sleep(2)
    print("B ..")
    async with c:
        c.notify_all()
    print("B done")
    await asyncio.sleep(1)

async def main():
    c = asyncio.Condition() # loop already running
    asyncio.create_task(a(c)) # get_event_loop() also works but the use is discouraged in the docs
    await b(c)

asyncio.run(main())

Upvotes: 1

user48956
user48956

Reputation: 15788

As @JanWilamowski points out, this seems to be a bug with Python < v3.10. Works fine in 3.10.

Booo!

It also seems that you can't work around this bug by switching from Condition to Queue (see below) - I guess Queue uses Condition internally. Simlarly ... this also failed:

async def main():
    loop.create_task(a())
    ...

loop = asyncio.new_event_loop()
loop.run_until_complete(main())
# asyncio.run(main())

However, for reasons unclear, this does work:

async def main():
    loop.create_task(a())
    ...

loop = asyncio.get_event_loop()   // @@@@
loop.run_until_complete(main())

Full working example below:

import asyncio
import time

USE_QUEUE = False

if not USE_QUEUE:
    c = asyncio.Condition()
else:
    q = asyncio.Queue()

async def a():
    print("A ..")
    # await asyncio.sleep(0.2) # This works
    if not USE_QUEUE:
        async with c:
           await c.wait()
    else:
        result = await q.get()
        q.task_done()
        print("result", result)
    print("A done", time.time())

async def b():
    await asyncio.sleep(1)
    print("B ..", time.time())
    if not USE_QUEUE:
        async with c:
            c.notify_all()
    else:
        result = await q.put(123)
    await asyncio.sleep(1)
    print("B done")

async def main():
    loop.create_task(a())
    # asyncio.get_event_loop().create_task(a())
    await b()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Upvotes: 0

Related Questions