Reputation: 307
I have an Python script which uses asyncio to make over a million requests. I first ran into memory issues and then discovered semaphores, I have since implemented a semaphore successfully to limit the number of concurrent task as well as tasks in the queue at one time.
My program loads a list of requests like this:
with open(wordlist) as words:
w = words.read().splitlines()
Then that list gets passed to the following function for processing and the actual work gets done.
async def _process_dns_wordlist(self, wordlist, domain):
"""Takes a list of words and adds them to the task list as space is available"""
for word in wordlist:
# Wait on the semaphore before adding more tasks
await self.sem.acquire()
host = '{}.{}'.format(word, domain)
task = asyncio.ensure_future(self._dns_lookup(host))
task.add_done_callback(functools.partial(self._dns_result_callback, host))
self.tasks.append(task)
await asyncio.gather(*self.tasks, return_exceptions=True)
Before I implemented the semaphore the program would just crash, running out of memory when I was queueing up all the tasks, now it runs a while and then crashes because it runs out of memory about 1/2 way through the requests.
I assume that this is because after the future is processed by my callback it sits there in memory wasting space. My problem is I cannot figure out what to use to delete the processed future once I'm done with it. I've read the asyncio docs and I don't see a destroy/delete method. Am I missing something really obvious?
Thanks for your help!
Upvotes: 2
Views: 3965
Reputation: 5
https://docs.python.org/3.12/library/asyncio-task.html#creating-tasks
For a truly background task where you don't need any post-processing. asyncio
recommends doing a discard callback.
background_tasks = set()
for i in range(10):
task = asyncio.create_task(some_coro(param=i))
# Add task to the set. This creates a strong reference.
background_tasks.add(task)
# To prevent keeping references to finished tasks forever,
# make each task remove its own reference from the set after
# completion:
task.add_done_callback(background_tasks.discard)
For your situation, you would want to instead callself.tasks.discard(future)
in your self._dns_result_callback
after you have finished processing the result.
Upvotes: 0
Reputation: 2852
Do you to store the tasks for some reason? As I understand it, each task object is kept by the eventloop. When you store another copy in self.tasks, at the point where the eventloop has finished it, there will still be a reference to it (in self.tasks). This will keep the future (which is done) from being garbage collected.
I would guess the code would work just as well, with less memory usage, if you changed self.tasks
to a local tasks
variable to pass to gather, then go out of scope.
This would be more efficient (and readable) than removing the future in the callback, but have the same net effect.
Upvotes: 2
Reputation: 307
It turns out that the answer was pretty simple, however I'm not sure if this is the correct way to do it.
In my callback, after I was done processing the result I did this:
self.tasks.remove(future)
This successfully solved my memory problem. If you have a better way to deal with this please let me know!
Upvotes: 1