David
David

Reputation: 61

Implementation of memory-efficient ThreadPool for large-scale itertools product

I have a dictionary of 15 variables, each with 3 values, for which I need to generate a product of all possible combinations (3**15 = 14.3M combinations). I'm using multi-threading with a 12 core processor to process the combinations (likely jumping to 64 cores).

I'm using itertools.product to generate the different combinations, and ThreadPool with imap_unordered to run the multiprocessing. Additionally, I'm using deque to remove the result as soon as it's available. However, I'm finding that the memory consumption is blowing up to about 2.5GB. I understand that the itertools.product is an iterable and therefore should not be storing much data in memory, but that doesn't seem to be the case.

Below is my code, and I'm wondering if anyone can help me figure out how I can better optimize the memory utilization.

Additionally, I'm wondering how the chunk size in the imap_unordered plays a role in memory efficiency. I was trying different numbers to see how it effects memory usage (including 10, 100, 1000, 10000) but it doesn't seem to impact much other than stabilizing the memory utilization at around 2.5GB. If I don't include the chunk size, memory tends to blow up >5GB.

I also tried changing the number of threads from 12 to 1, and that also did not impact the memory usage. However, using the single-processor implementation (commented out below) reduces the memory usage to only ~30MB.

import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy

def dummy_func(values, keys):
    print( dict(zip(keys, values)) )
    return

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'], 
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'], 
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'], 
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'], 
                  'i': ['7p', '16p', '22p'], 
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'], 
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'], 
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']}
    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    # process simulations for all permutations using multi-threading
    with multiprocessing.pool.ThreadPool(num_threads) as workers:
        queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys), 
                                           itertools.product(*map(parameters.get, keys)), 100))
    return

if __name__ == "__main__":
    main()

Upvotes: 2

Views: 389

Answers (1)

Booboo
Booboo

Reputation: 44283

Update 2

I have of late learned that if you are using methods multiprocessing.pool.Pool.imap or multiprocessing.pool.Pool.imap_unordered then there is no need to use the special BoundedQueueProcessPool class I developed to prevent the submission of tasks to the processing pool's task queue faster than the pools' processes can run them resulting in memory exploding. These two methods iterate the passed iterable only to prevent the pool processes from remaining order, so task submission is already throttled. However, if you were, for example, submitting tasks in a loop with the multiprocessing.pool.Pool.apply_async method, the task queue could grow extremely large without the throttling that BoundedQueueProcessPool provides. So to simplify the code we will be using standard classes.

If you do not want to blow up memory you need 2 things:

  1. You need to have an iterable that is generating your values being passed to dummy_func that generates the values incrementally. itertools.product actually generates all the values in memory before yielding the first value, so it will blow up memory regardless of anything else you do.
  2. You must use a function that processes the iterable one by one and for each result appends the result to the deque initialized with a suitable non-zero maxlen argument. Your current code is initializing the deque with the complete output of the map function, which will have the length of the passed iterable. This will blow memory.

To overcome the problem described in 1. I am using the permutations generator function.

To overcome the problem described in 2. I have initialized an empty deque with maxlen=10. As each value is returned from dumy_func I will append it to the deque.

import multiprocessing
from functools import partial
import queue
from itertools import permutations

def dummy_func(values, keys):
    #print( dict(zip(keys, values)))
    ...
    return dict(zip(keys, values))

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
                  }

    # A more reasonably sized parameters:
    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  }


    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    q = queue.deque(maxlen=10)

    pool = multiprocessing.Pool(num_threads)
    for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
        q.append(v)
    return q

if __name__ == '__main__':
    import time
    t = time.time()
    q = main()
    print(q)
    print(time.time() - t)

Upvotes: 1

Related Questions