Reputation: 178
Now I try to compute some heavy calculation called test()
using apply_async
from multiprocessing
module.
The results are stacked in the dictionary called pool_out
with callback function log_result
.
However, when I run over a million jobs to apply_async
, it takes too much time to enqueue to the pool
, and the time increases quadratically with the number of jobs as follows:
from os import cpu_count
from multiprocessing import Pool
from functools import partial
from time import time
def test(i):
return i + 10
def log_result(i, result):
pool_out[i] = result
pool_out = {}
pool = Pool(cpu_count())
t = time()
try:
print("i\t total queue size \t enqueue time")
for i in range(10000000):
if i % 10000 == 0:
print(str(i) + "\t\t" + str(pool._taskqueue.qsize()) \
+ "\t\t (%.3f sec)"%(time() - t))
t = time()
if i not in list(pool_out.keys()):
pool.apply_async(test, args=(i,), callback=partial(log_result,i))
pool.close()
pool.join()
except KeyboardInterrupt:
pool.terminate()
Output:
i total queue size enqueue time
0 0 (0.001 sec)
10000 9986 (0.061 sec)
20000 19974 (0.060 sec)
30000 29967 (0.048 sec)
40000 39959 (0.054 sec)
...
2000000 1983601 (1.890 sec)
2010000 1993429 (1.873 sec)
2020000 2002977 (1.966 sec)
So how can we limit the number of queue waiting for executing in pool
?
Here what I have tried.
It is trivial that the problem can be solved if I reduce the number of jobs. So, I can chunk the number of jobs using
for j in range(100):
pool_out = {}
pool = Pool(cpu_count())
for i in range(100000*j, 100000*(j+1)):
if i % 10000 == 0:
print(str(i) + "\t\t" + str(pool._taskqueue.qsize()) \
+ "\t\t (%.3f sec)"%(time() - t))
t = time()
if i not in list(pool_out.keys()):
pool.apply_async(test, args=(i,), callback=partial(log_result,i))
pool.close()
pool.join()
instead of
for i in range(10000000):
if i % 10000 == 0:
print(str(i) + "\t\t" + str(pool._taskqueue.qsize()) \
+ "\t\t (%.3f sec)"%(time() - t))
t = time()
if i not in list(pool_out.keys()):
pool.apply_async(test, args=(i,), callback=partial(log_result,i))
pool.close()
pool.join()
However, it does not seem to be a neat solution.
Upvotes: 0
Views: 270
Reputation: 44108
Change
if i not in list(pool_out.keys()):
to:
if i not in pool_out:
This becomes a bigger and bigger inefficiency. What should be an O(1) operation (more or less) is an O(N) operation (and space inefficient) the way you are doing it and N gets very large after a while.
Upvotes: 1