Blark
Blark

Reputation: 307

Delete/destroy finished futures asyncio Python

I have an Python script which uses asyncio to make over a million requests. I first ran into memory issues and then discovered semaphores, I have since implemented a semaphore successfully to limit the number of concurrent task as well as tasks in the queue at one time.

My program loads a list of requests like this:

        with open(wordlist) as words:
            w = words.read().splitlines()

Then that list gets passed to the following function for processing and the actual work gets done.

async def _process_dns_wordlist(self, wordlist, domain):
    """Takes a list of words and adds them to the task list as space is available"""
    for word in wordlist:
        # Wait on the semaphore before adding more tasks
        await self.sem.acquire()
        host = '{}.{}'.format(word, domain)
        task = asyncio.ensure_future(self._dns_lookup(host))
        task.add_done_callback(functools.partial(self._dns_result_callback, host))
        self.tasks.append(task)
    await asyncio.gather(*self.tasks, return_exceptions=True)

Before I implemented the semaphore the program would just crash, running out of memory when I was queueing up all the tasks, now it runs a while and then crashes because it runs out of memory about 1/2 way through the requests.

I assume that this is because after the future is processed by my callback it sits there in memory wasting space. My problem is I cannot figure out what to use to delete the processed future once I'm done with it. I've read the asyncio docs and I don't see a destroy/delete method. Am I missing something really obvious?

Thanks for your help!

Upvotes: 2

Views: 3965

Answers (3)

papeto
papeto

Reputation: 5

https://docs.python.org/3.12/library/asyncio-task.html#creating-tasks

For a truly background task where you don't need any post-processing. asyncio recommends doing a discard callback.

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

For your situation, you would want to instead callself.tasks.discard(future) in your self._dns_result_callback after you have finished processing the result.

Upvotes: 0

Brett Stottlemyer
Brett Stottlemyer

Reputation: 2852

Do you to store the tasks for some reason? As I understand it, each task object is kept by the eventloop. When you store another copy in self.tasks, at the point where the eventloop has finished it, there will still be a reference to it (in self.tasks). This will keep the future (which is done) from being garbage collected.

I would guess the code would work just as well, with less memory usage, if you changed self.tasks to a local tasks variable to pass to gather, then go out of scope.

This would be more efficient (and readable) than removing the future in the callback, but have the same net effect.

Upvotes: 2

Blark
Blark

Reputation: 307

It turns out that the answer was pretty simple, however I'm not sure if this is the correct way to do it.

In my callback, after I was done processing the result I did this:

self.tasks.remove(future)

This successfully solved my memory problem. If you have a better way to deal with this please let me know!

Upvotes: 1

Related Questions