Seung Hyeon Yu
Seung Hyeon Yu

Reputation: 178

Multiprocessing apply_async with over million processes becomes slower

Now I try to compute some heavy calculation called test() using apply_async from multiprocessing module. The results are stacked in the dictionary called pool_out with callback function log_result.

However, when I run over a million jobs to apply_async, it takes too much time to enqueue to the pool, and the time increases quadratically with the number of jobs as follows:

from os import cpu_count
from multiprocessing import Pool
from functools import partial
from time import time

def test(i):
    return i + 10
def log_result(i, result):
    pool_out[i] = result

pool_out = {}
pool = Pool(cpu_count())
t = time()
try:
    print("i\t total queue size \t enqueue time")
    for i in range(10000000):
        if i % 10000 == 0:
            print(str(i) + "\t\t" + str(pool._taskqueue.qsize()) \
                  + "\t\t (%.3f sec)"%(time() - t))
            t = time()
        if i not in list(pool_out.keys()):
            pool.apply_async(test, args=(i,), callback=partial(log_result,i))
    pool.close()
    pool.join()
except KeyboardInterrupt:
    pool.terminate()

Output:

i    total queue size    enqueue time
0       0        (0.001 sec)
10000       9986         (0.061 sec)
20000       19974        (0.060 sec)
30000       29967        (0.048 sec)
40000       39959        (0.054 sec)
...
2000000     1983601      (1.890 sec)
2010000     1993429      (1.873 sec)
2020000     2002977      (1.966 sec)

enter image description here

So how can we limit the number of queue waiting for executing in pool?


Here what I have tried.

It is trivial that the problem can be solved if I reduce the number of jobs. So, I can chunk the number of jobs using

for j in range(100):
    pool_out = {}
    pool = Pool(cpu_count())
    for i in range(100000*j, 100000*(j+1)):
        if i % 10000 == 0:
            print(str(i) + "\t\t" + str(pool._taskqueue.qsize()) \
                  + "\t\t (%.3f sec)"%(time() - t))
            t = time()
        if i not in list(pool_out.keys()):
            pool.apply_async(test, args=(i,), callback=partial(log_result,i))
    pool.close()
    pool.join()

instead of

for i in range(10000000):
    if i % 10000 == 0:
        print(str(i) + "\t\t" + str(pool._taskqueue.qsize()) \
              + "\t\t (%.3f sec)"%(time() - t))
        t = time()
    if i not in list(pool_out.keys()):
        pool.apply_async(test, args=(i,), callback=partial(log_result,i))
pool.close()
pool.join()

However, it does not seem to be a neat solution.

Upvotes: 0

Views: 270

Answers (1)

Booboo
Booboo

Reputation: 44108

Change

if i not in list(pool_out.keys()):

to:

if i not in pool_out:

This becomes a bigger and bigger inefficiency. What should be an O(1) operation (more or less) is an O(N) operation (and space inefficient) the way you are doing it and N gets very large after a while.

Upvotes: 1

Related Questions