confiq
confiq

Reputation: 2928

how to stop coroutines in asyncio without warning "Task was destroyed but it is pending!"

I have a synchronous loop that I want to implement asyncio tasks. So far I wrote this POC:

async def consumer(queue):
    while True:
        revision = await queue.get()
        await asyncio.sleep(revision/10 * random.random() * 2)
        queue.task_done()
        print(f'done working on {revision}')

def main():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    task = loop.create_task(consumer(queue))
    for run in range(1, 10):
        print(f'produced {run}')
        # the idea is to start the task while still working on syncronius loop
        loop.run_until_complete(queue.put(run))
        time.sleep(.1)
    print('---- done producing')
    # loop.run_until_complete(task)  # runs forever
    # loop.run_until_complete(asyncio.gather(task))  # runs forever
    loop.run_until_complete(queue.join())  # gives: Task was destroyed but it is pending!

main()

The code is executing well, I have the output exactly what I want:

produced 1
produced 2
done working on 1
produced 3
produced 4
done working on 2
produced 5
produced 6
produced 7
done working on 3
produced 8
produced 9
---- done producing
done working on 4
done working on 5
done working on 6
done working on 7
done working on 8
done working on 9
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<consumer() done, defined at .../deleteme.py:8> wait_for=<Future cancelled>>

Except for the last warning (debug error? Python 3.9.1) but not the exception. I know I'm doing something wrong but I can't figure out what exactly. Writing this in multiprocess would solve it but this problem is bugging me almost the whole afternoon.

Upvotes: 0

Views: 2044

Answers (1)

furas
furas

Reputation: 143197

Your task runs consumer which uses while True - so it is still working - and this makes problem.

And this is why run_until_complete(task) runs forever because you runs while True forever.

You have to stop this task. You could use queue to send some value - i.e stop - which you could check in revision != 'stop' and use return to exit while True

while True:
    revision = await queue.get()

    if revision == 'stop':
        queue.task_done()
        print(f'done working on {revision}')
        return  # <-- exit `task`

    await asyncio.sleep(revision/10 * random.random() * 2)
    
    queue.task_done()
    print(f'done working on {revision}')

And you have to send 'stop' as last value in queue

loop.run_until_complete(queue.put('stop'))

And then all versions work

    loop.run_until_complete(task)                  # now works
    loop.run_until_complete(asyncio.gather(task))  # now works
    loop.run_until_complete(queue.join())          # now works

Full working code:

import asyncio
import time
import random

async def consumer(queue):
    while True:
        revision = await queue.get()

        if revision == 'stop':
            queue.task_done()
            print(f'done working on {revision}')
            return

        await asyncio.sleep(revision/10 * random.random() * 2)
        
        queue.task_done()
        print(f'done working on {revision}')


def main():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    task = loop.create_task(consumer(queue))
    for run in range(1, 10):
        print(f'produced {run}')
        # the idea is to start the task while still working on syncronius loop
        loop.run_until_complete(queue.put(run))
        time.sleep(.5)
    print('---- done producing')
    
    print(f'produced stop')
    loop.run_until_complete(queue.put('stop'))
    
    loop.run_until_complete(task)  # now works
    #loop.run_until_complete(asyncio.gather(task))  # now works
    #loop.run_until_complete(queue.join())  # now works

main()

Upvotes: 2

Related Questions