Reputation: 12017
I have the Python
code:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
for i in range(0, MAX_PROCESSES):
p = Process(target=f, args=(i,))
p.start()
which runs well. However, MAX_PROCESSES
is variable and can be any value between 1
and 512
. Since I'm only running this code on a machine with 8
cores, I need to find out if it is possible to limit the number of processes allowed to run at the same time. I've looked into multiprocessing.Queue
, but it doesn't look like what I need - or perhaps I'm interpreting the docs incorrectly.
Is there a way to limit the number of simultaneous multiprocessing.Process
s running?
Upvotes: 112
Views: 202893
Reputation: 10626
I use multiprocessing to test newly developed code against massive amounts of test data. I thereby want to get results as fast as possible: If the new code fails for one of the test data, I can start developing a fix. While I do so, I want to see how the code performes on the rest of the test data. Then I can potentially change the order in which test data is processed in the next run (to see failures fast).
The following template
pd.DataFrame
as soon as availableimport sys
import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy
import logging
import pandas as pd
N_PROC = mp.cpu_count() - 1 # number of processes you want to run in parallel (others are waiting for semaphore)
MULTIPROCESSING_UPDATE_CICLE = .1 # wait so long until you check all jobs again if finished
# logging
DEFAULT_FORMAT = "\n%(levelname)s - %(asctime)s.%(msecs)03d - %(filename)s, l %(lineno)d:\n%(message)s"
DEFAULT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
default_stream_handler = logging.StreamHandler(sys.stdout)
default_stream_handler.setFormatter(logging.Formatter(fmt=DEFAULT_FORMAT, datefmt=DEFAULT_TIME_FORMAT))
logger = logging.getLogger("mp_template")
logger.setLevel(logging.DEBUG)
logger.addHandler(default_stream_handler)
# fix seed
random.seed(42) # a 'not so' arbitrary number
def process_single_task(task_name: str) -> Dict:
"""
This is the slow function you want to parallelize.
Parameters
----------
task_name : str
some input
Returns
-------
Dict :
Returns dictionary of different value produced during execution.
This is overengeneered for this example, but pretty handy for more complex function.
"""
result = {}
n_sec = random.randint(1, 4)
logger.debug(f"start {task_name=}, {n_sec=}")
time.sleep(n_sec)
logger.debug(f"end {task_name=}, {n_sec=}")
result['n_sec'] = n_sec
result['log'] = f"executed {task_name=}"
return result
def fct_to_multiprocessing(
fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
"""
Function for handling maximum number of active processes and managing each processes return value.
Parameters
----------
fct : Callable
Function to execute in separate process
fct_kwargs : Dict[str, Any]
kwargs for fct
job_id : int
id to manage results. Result is stored in results[job_id]
results: DictProxy
special mp dict to manage return values of fct
semaphore: mp.Semaphore
semaphore object to prevent more than N_PROC running in parallel
Example
-------
Use as following:
manager = mp.Manager()
results = manager.dict()
sema = mp.Semaphore(N_PROC)
jobs = {}
for job_id in ...:
jobs[job_id] = mp.Process(
target=fct_to_multiprocessing,
kwargs={
"fct": ..., "fct_kwargs": {...},
"job_id": job_id, "results": results, "semaphore": sema
}
)
jobs[proj_name].start()
"""
if semaphore is not None:
semaphore.acquire()
results[job_id] = fct(**fct_kwargs)
if semaphore is not None:
semaphore.release()
def manage_results(df_results: pd.DataFrame, job_id: int, result: Dict) -> pd.DataFrame:
df_results.loc[job_id, result.keys()] = result.values()
logger.info(df_results)
return df_results
def process_all_tasks(tasks: List[str]):
logger.info(f"\n\n{''.center(80, '=')}\n{' started '.center(80, '=')}\n{''.center(80, '=')}\n")
logger.info(f"executing code on {N_PROC} / {mp.cpu_count()} simultaneous processes")
job_ids = [f"job_id={job_id}" for job_id in tasks]
df_results = pd.DataFrame(index=job_ids)
# run jobs
if N_PROC == 1: # no parallelization, good for debugging
for job_id, task in zip(job_ids, tasks):
result = process_single_task(task_name=task)
df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
else: # parallelization on
manager = mp.Manager()
results = manager.dict()
sema = mp.Semaphore(N_PROC)
jobs = {}
for job_id, task in zip(job_ids, tasks):
jobs[job_id] = mp.Process(
target=fct_to_multiprocessing,
kwargs={
"fct": process_single_task, "fct_kwargs": {"task_name": task},
"job_id": job_id, "results": results, "semaphore": sema
}
)
jobs[job_id].start()
while jobs: # as soon as a job is completed, add this to df_results
for job_id in jobs.keys():
job = jobs[job_id]
if job.exitcode is not None: # a new job is completed
job.join()
result = results[job_id]
job.close()
del jobs[job_id]
df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
break
time.sleep(MULTIPROCESSING_UPDATE_CICLE)
logger.info(f"\n\n{''.center(80, '=')}\n{' finished '.center(80, '=')}\n{''.center(80, '=')}\n")
logger.info(df_results)
if __name__ == "__main__":
tasks = list("abcdef")
process_all_tasks(tasks)
$ python 230315_multiprocessing_template.py
INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 98:
================================================================================
=================================== started ====================================
================================================================================
INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 99:
executing code on 3 / 4 simultaneous processes
DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='a', n_sec=2
DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='b', n_sec=2
DEBUG - 2023-03-15T10:51:09.796 - 230315_multiprocessing_template.py, l 43:
start task_name='c', n_sec=1
DEBUG - 2023-03-15T10:51:10.797 - 230315_multiprocessing_template.py, l 45:
end task_name='c', n_sec=1
DEBUG - 2023-03-15T10:51:10.798 - 230315_multiprocessing_template.py, l 43:
start task_name='d', n_sec=1
INFO - 2023-03-15T10:51:10.901 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a NaN NaN
job_id=b NaN NaN
job_id=c 1.0 executed task_name='c'
job_id=d NaN NaN
job_id=e NaN NaN
job_id=f NaN NaN
DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='a', n_sec=2
DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='b', n_sec=2
DEBUG - 2023-03-15T10:51:11.797 - 230315_multiprocessing_template.py, l 43:
start task_name='f', n_sec=2
DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 43:
start task_name='e', n_sec=1
DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 45:
end task_name='d', n_sec=1
INFO - 2023-03-15T10:51:11.807 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b NaN NaN
job_id=c 1.0 executed task_name='c'
job_id=d NaN NaN
job_id=e NaN NaN
job_id=f NaN NaN
INFO - 2023-03-15T10:51:11.910 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d NaN NaN
job_id=e NaN NaN
job_id=f NaN NaN
INFO - 2023-03-15T10:51:12.014 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e NaN NaN
job_id=f NaN NaN
DEBUG - 2023-03-15T10:51:12.799 - 230315_multiprocessing_template.py, l 45:
end task_name='e', n_sec=1
INFO - 2023-03-15T10:51:12.819 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e 1.0 executed task_name='e'
job_id=f NaN NaN
DEBUG - 2023-03-15T10:51:13.800 - 230315_multiprocessing_template.py, l 45:
end task_name='f', n_sec=2
INFO - 2023-03-15T10:51:13.824 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e 1.0 executed task_name='e'
job_id=f 2.0 executed task_name='f'
INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 140:
================================================================================
=================================== finished ===================================
================================================================================
INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 141:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e 1.0 executed task_name='e'
job_id=f 2.0 executed task_name='f'
Upvotes: 0
Reputation: 1267
You could use concurrent.futures
to do this with a ProcessPoolExecutor
. Under the hood, the ProcessPoolExecutor
uses Process
and Semaphore
from multiprocessing
very similarly to some of the other answers here. Check it out if you want here. I'm adding this answer because it is so far the only example to use the more recent concurrent.futures
api to achieve the same thing.
from concurrent.futures import ProcessPoolExecutor,Future,wait
import typing as T
MAX_WORKERS: int = 4
INPUT_SIZE: int = 512
def f(x: int) -> int:
return x**2
input_vec = range(INPUT_SIZE)
thread_pool: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
threads: T.List[Future] = []
print(f'Spreading {INPUT_SIZE} tasks over {MAX_WORKERS} threads...')
for x in input_vec:
# ProcessPoolExecutor.submit(callable,*args_to_callable) returns Future
threads.append(thread_pool.submit(f,x))
# wait for threads to complete (all Futures terminal state)
wait(threads)
print('All tasks complete!')
output_vec: T.List[int] = [thread.result() for thread in threads]
Upvotes: 0
Reputation: 606
I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:
from multiprocessing import Process
from multiprocessing import Semaphore
import time
def f(name, sema):
print('process {} starting doing business'.format(name))
# simulate a time-consuming task by sleeping
time.sleep(5)
# `release` will add 1 to `sema`, allowing other
# processes blocked on it to continue
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
# once 20 processes are running, the following `acquire` call
# will block the main process since `sema` has been reduced
# to 0. This loop will continue only after one or more
# previously created processes complete.
sema.acquire()
p = Process(target=f, args=(i, sema))
all_processes.append(p)
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
The following code is more structured since it acquires and releases sema
in the same function. However, it will consume too much resources if total_task_num
is very large:
from multiprocessing import Process
from multiprocessing import Semaphore
import time
def f(name, sema):
print('process {} starting doing business'.format(name))
# `sema` is acquired and released in the same
# block of code here, making code more readable,
# but may lead to problem.
sema.acquire()
time.sleep(5)
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
p = Process(target=f, args=(i, sema))
all_processes.append(p)
# the following line won't block after 20 processes
# have been created and running, instead it will carry
# on until all 1000 processes are created.
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
The above code will create total_task_num
processes but only concurrency
processes will be running while other processes are blocked, consuming precious system resources.
Upvotes: 24
Reputation: 632
more generally, this could also look like this:
import multiprocessing
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
numberOfThreads = 4
if __name__ == '__main__':
jobs = []
for i, param in enumerate(params):
p = multiprocessing.Process(target=f, args=(i,param))
jobs.append(p)
for i in chunks(jobs,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()
Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.
Upvotes: 7
Reputation: 2931
It might be most sensible to use multiprocessing.Pool
which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.
The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
And it's also handy to know that there is the multiprocessing.cpu_count()
method to count the number of cores on a given system, if needed in your code.
Edit: Here's some draft code that seems to work for your specific case:
import multiprocessing
def f(name):
print 'hello', name
if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()
Upvotes: 151