Reputation: 1147
While using python multiprocessing pool how many jobs are submitted?
How is it decided? Can we control it somehow? Like at most 10 jobs in the queue at most to reduce memory usage.
Assume that I have the backbone code written below: For each chrom and simulation I read the data as pandas dataframe.
(I thought that reading data before submiting the job would be better, to reduce I/O bound in the worker process)
Then I send the pandas dataframe to each worker to process it.
But it seems that lots of jobs are submitted than the number of jobs finalized and this is resulting in memory error.
numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=numofProcesses)
jobs=[]
all_result1={}
all_result2={}
def accumulate(result):
result1=result[0]
result2=result[1]
accumulate(resulst1,all_result1)
accumulate(resulst2,all_result2)
print('ACCUMULATE')
for each chr:
for each sim:
chrBased_simBased_df= readData(chr,sim)
jobs.append(pool.apply_async(func, args=(chrBased_simBased_df,too,many,),callback=accumulate))
print('Submitted job:%d' %(len(jobs)))
pool.close()
pool.join()
Is there a way to get rid of it?
Upvotes: 0
Views: 942
Reputation: 15020
Both multiprocessing.Pool
and concurrent.futures.ProcessPoolExecutor
do not allow to limit the amount of tasks you submit to the workers.
Nevertheless, this a very trivial extension you can build yourself by using a Semaphore.
You can check an example in this gist. It uses the concurrent.futures
module but it should be trivial to port it to multiprocessing.Pool
as well.
from threading import BoundedSemaphore
from concurrent.futures import ProcessPoolExecutor
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
pool = MaxQueuePool(ProcessPoolExecutor, 8)
f = pool.submit(print, "Hello World!")
f.result()
Upvotes: 2