spyder
spyder

Reputation: 155

asyncio task was destroyed but it is pending

I am working a sample program that reads from a datasource (csv or rdbms) in chunks, makes some transformation and sends it via socket to a server.

But because the csv is very large, for testing purpose I want to break the reading after few chunks. Unfortunately something goes wrong and I do not know what and how to fix it. Probably I have to do some cancellation, but now sure where and how. I get the following error:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>

The sample code is:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: {j}")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send({k:v}))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Upvotes: 7

Views: 7069

Answers (4)

wwii
wwii

Reputation: 23743

This works...

import asyncio
import json
import logging

logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
                    datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.002)
    root.info('readChunks: next chunk coming')
    yield chunk

async def send(row):
    j = json.dumps(row)
    root.info(f"to be sent: {j}")
    await asyncio.sleep(0.002)


async def main():
    i = 0
    root.info('main: starting to read chunks')
    async for chunk in readChunks():
        for k, v in chunk.items():
            root.info(f'main: sending an item')
            #await asyncio.gather(send({k:v}))
            stuff = await send({k:v})
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")

##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()

if __name__ == '__main__':

    asyncio.run(main())

... At least it runs and finishes.


The issue with stopping an async generator by reaking out of an async for loop is described in bugs.python.org/issue38013 and looks like it was fixed in 3.7.5.

However, using

loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()

I get a debug error but no Exception in Python 3.8.

Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>

Using the higher level API asyncio.run(main()) with debugging ON I do not get the debug message. If you are going to try and upgrade to Python 3.7.5-9 you probably should still use asyncio.run().

Upvotes: 2

MisterMiyagi
MisterMiyagi

Reputation: 50066

Many async resources, such as generators, need to be cleaned up with the help of an event loop. When an async for loop stops iterating an async generator via break, the generator is cleaned up by the garbage collector only. This means the task is pending (waits for the event loop) but gets destroyed (by the garbage collector).

The most straightforward fix is to aclose the generator explicitly:

async def main():
    i = 0
    aiter = readChunks()      # name iterator in order to ...
    try:
        async for chunk in aiter:
            ...
            i += 1
            if i > 5:
                break
    finally:
        await aiter.aclose()  # ... clean it up when done

These patterns can be simplified using the asyncstdlib (disclaimer: I maintain this library). asyncstdlib.islice allows to take a fixed number of items before cleanly closing the generator:

import asyncstdlib as a

async def main():
    async for chunk in a.islice(readChunks(), 5):
        ...

If the break condition is dynamic, scoping the iterator guarantees cleanup in any case:

import asyncstdlib as a

async def main():
    async with a.scoped_iter(readChunks()) as aiter:
        async for idx, chunk in a.enumerate(aiter):
            ...
            if idx >= 5:
                break

Upvotes: 7

7u5h4r
7u5h4r

Reputation: 457

Your readChunks is running in async and your loop. and without completing the program you are breaking it.

That's why it gives asyncio task was destroyed but it is pending

In short async task was doing its work in the background but you killed it by breaking the loop (stopping the program).

Upvotes: 0

alex_noname
alex_noname

Reputation: 32053

The problem is simple. You do early exit from loop, but async generator is not exhausted yet(its pending):

...
if i > 5:
    break
...

Upvotes: 0

Related Questions