Reputation: 61
I have a dictionary of 15 variables, each with 3 values, for which I need to generate a product of all possible combinations (3**15 = 14.3M combinations). I'm using multi-threading with a 12 core processor to process the combinations (likely jumping to 64 cores).
I'm using itertools.product
to generate the different combinations, and ThreadPool
with imap_unordered
to run the multiprocessing. Additionally, I'm using deque
to remove the result as soon as it's available. However, I'm finding that the memory consumption is blowing up to about 2.5GB. I understand that the itertools.product
is an iterable and therefore should not be storing much data in memory, but that doesn't seem to be the case.
Below is my code, and I'm wondering if anyone can help me figure out how I can better optimize the memory utilization.
Additionally, I'm wondering how the chunk size in the imap_unordered
plays a role in memory efficiency. I was trying different numbers to see how it effects memory usage (including 10, 100, 1000, 10000) but it doesn't seem to impact much other than stabilizing the memory utilization at around 2.5GB. If I don't include the chunk size, memory tends to blow up >5GB.
I also tried changing the number of threads from 12 to 1, and that also did not impact the memory usage. However, using the single-processor implementation (commented out below) reduces the memory usage to only ~30MB.
import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy
def dummy_func(values, keys):
print( dict(zip(keys, values)) )
return
def main():
num_threads = multiprocessing.cpu_count()
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']}
keys = list(parameters)
# process simulations for all permutations using single process
#for values in itertools.product(*map(parameters.get, keys)):
# dummy_func(values, keys)
# process simulations for all permutations using multi-threading
with multiprocessing.pool.ThreadPool(num_threads) as workers:
queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys),
itertools.product(*map(parameters.get, keys)), 100))
return
if __name__ == "__main__":
main()
Upvotes: 2
Views: 389
Reputation: 44283
Update 2
I have of late learned that if you are using methods multiprocessing.pool.Pool.imap
or multiprocessing.pool.Pool.imap_unordered
then there is no need to use the special BoundedQueueProcessPool
class I developed to prevent the submission of tasks to the processing pool's task queue faster than the pools' processes can run them resulting in memory exploding. These two methods iterate the passed iterable only to prevent the pool processes from remaining order, so task submission is already throttled. However, if you were, for example, submitting tasks in a loop with the multiprocessing.pool.Pool.apply_async
method, the task queue could grow extremely large without the throttling that BoundedQueueProcessPool
provides. So to simplify the code we will be using standard classes.
If you do not want to blow up memory you need 2 things:
dummy_func
that generates the values incrementally. itertools.product
actually generates all the values in memory before yielding the first value, so it will blow up memory regardless of anything else you do.deque
initialized with a suitable non-zero maxlen argument. Your current code is initializing the deque
with the complete output of the map
function, which will have the length of the passed iterable. This will blow memory.To overcome the problem described in 1. I am using the permutations
generator function.
To overcome the problem described in 2. I have initialized an empty deque with maxlen=10. As each value is returned from dumy_func
I will append it to the deque.
import multiprocessing
from functools import partial
import queue
from itertools import permutations
def dummy_func(values, keys):
#print( dict(zip(keys, values)))
...
return dict(zip(keys, values))
def main():
num_threads = multiprocessing.cpu_count()
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
}
# A more reasonably sized parameters:
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
}
keys = list(parameters)
# process simulations for all permutations using single process
#for values in itertools.product(*map(parameters.get, keys)):
# dummy_func(values, keys)
q = queue.deque(maxlen=10)
pool = multiprocessing.Pool(num_threads)
for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
q.append(v)
return q
if __name__ == '__main__':
import time
t = time.time()
q = main()
print(q)
print(time.time() - t)
Upvotes: 1