Reputation: 5723
I am using ThreadPoolExecutor
in order to download a huge (~400k) amount of keyframe images. Keyframes names are stored in text file (let's say keyframes_list.txt).
I have modified the example provided in the documentation and it seems to work flawlessly with one exception: as it is clear the example passes every link to a future
object which are all passed to an iterable (dict()
to be precise). This iterable is passed as argument to as_completed()
function to check when a future
is completed. This of course requires a huge amount of text loaded at once in memory. My python process for this task takes up 1GB of RAM.
The full code is shown below:
import concurrent.futures
import requests
def download_keyframe(keyframe_name):
url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
r = requests.get(url, allow_redirects=True)
open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
return True
keyframes_list_path = '/path/to/keyframes_list.txt'
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
with open(keyframes_list_path, 'r') as f:
for i, line in enumerate(f):
fields = line.split('\t')
keyframe_name = fields[0]
future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
for future in concurrent.futures.as_completed(future_to_url):
keyframe_name = future_to_url[future]
try:
future.result()
except Exception as exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))
So, my question is how could I provide both an iterable and also keep memory footprint low. I have considered using queue
but I am not sure it's cooperating with ThreadPoolExecutor
smoothly. Is there an easy way to control the amount of future
s submitted to ThreadPoolExecutor
?
Upvotes: 1
Views: 3804
Reputation: 3615
The answer by AdamKG is a good start, but his code will wait until a chunk has been processed completely before starting to process the next chunk. Therefore, you lose some performance.
I suggest a slightly different approach that will feed a continuous stream of tasks to the executor while enforcing an upper bound on the maximum number of parallel tasks in order to keep the memory footprint low.
The trick is to use concurrent.futures.wait
to keep track of the futures that have been completed and those that are still pending completion:
def download_keyframe(keyframe_name):
try:
url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
r = requests.get(url, allow_redirects=True)
open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
except Exception as e:
return keyframe_name, e
return keyframe_name, None
MAX_WORKERS = 8
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
with open(keyframes_list_path, 'r') as fh:
futures_notdone = set()
futures_done = set()
for i, line in enumerate(fh):
# Submit new task to executor.
fields = line.split('\t')
keyframe_name = fields[0]
futures_notdone.add(executor.submit(download_keyframe, keyframe_name))
# Enforce upper bound on number of parallel tasks.
if len(futures_notdone) >= MAX_WORKERS:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)
# Process results.
for future in futures_done:
keyframe_name, exc = future.result()
if exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))
Of course, you could also process the results inside the loop regularly in order to empty the futures_done
from time to time. For example, you could do that each time the number of items in futures_done
exceeds 1000 (or any other amount that fits your needs). This might come in handy if your dataset is very large and the results alone would result in a lot of memory usage.
Upvotes: 5
Reputation: 14081
If we look at the source for as_completed()
, the first thing it does is evaluate any iterable you pass as the first argument, on line 221, with fs=set(fs)
. So as long as you're reading and queuing the entire file at once, as_completed()
is going to load all those Future instances into memory when you call it.
To get around it, you need to chunk the input, and only call as_completed with a subset of the Futures, on each iteration. You can use the snippet from this answer; chunks of ~1k should keep your thread pool saturated while not consuming excessive memory. Your final code, starting with the with-block for the ThreadPoolExecutor, should look something like this:
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
for lines in grouper(open(keyframes_list_path, 'r'), 1000):
# reset the dict that as_completed() will check on every iteration
future_to_url = {}
for i, line in enumerate(lines):
fields = line.split('\t')
keyframe_name = fields[0]
future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
for future in concurrent.futures.as_completed(future_to_url):
keyframe_name = future_to_url[future]
try:
future.result()
except Exception as exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))
Upvotes: 2