Reputation: 360
I run a function in parallel and each worker has access to one GPU with id 0 or 1.
def f(GPU_id, arg):
# Select GPU to use.
os.environ["CUDA_VISIBLE_DEVICES"]=str(GPU_id)
# Do something with arg.
Let's say I want to evaluate arg=[1, 2, 3, 4]
.
from multiprocessing import Pool
p = Pool(2)
for arg in [[1, 2], [3, 4]]:
# Call the function in parallel on GPU 0 and 1
p.starmap(f, zip([0, 1], arg))
But now, I would like to run it asynchronously (not sure if this is the right term), meaning that the workers do not wait until the others have finished their task before they go to the next task. Therefore, the worker needs to keep the GPU_id and take the next argument from the list.
Any ideas how to do this?
Upvotes: 4
Views: 4126
Reputation: 2815
The answer by @eozd is correct, but can be improved.
This will uniformly distribute worker multiple worker processes among a given number of GPUs. Multiple workers can share a GPU if they fit within its memory.
import logging
import multiprocessing
import os
# logging just to get not mangled outputs
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_process_index(process) -> int:
proc_name = multiprocessing.current_process().name
# eg. "ForkPoolWorker-10", can be numbered not from zero upon multiple Pool invocations,
# be the numbering should be contiguous
return int(proc_name.split("-")[1])
def initialize(gpus: list[str]):
if gpus:
proc_index = get_process_index(multiprocessing.current_process())
selected_gpu = gpus[proc_index % len(gpus)]
os.environ["CUDA_VISIBLE_DEVICES"] = str(selected_gpu)
logger.info(f"process id: {proc_index} -> GPU id: {selected_gpu}")
def work(i):
time.sleep(0.1)
logger.info(f"work item {i} on GPU {os.environ['CUDA_VISIBLE_DEVICES']}")
available_gpu_ids = [3, 5, 7]
with multiprocessing.Pool(processes=4, initializer=initialize, initargs=(available_gpu_ids,)) as pool:
pool.map(work, range(12))
Example output:
INFO:__main__:process id: 17 -> GPU id: 7
INFO:__main__:process id: 18 -> GPU id: 3
INFO:__main__:process id: 19 -> GPU id: 5
INFO:__main__:process id: 20 -> GPU id: 7
INFO:__main__:work item 2 on GPU 5
INFO:__main__:work item 0 on GPU 7
INFO:__main__:work item 1 on GPU 3
INFO:__main__:work item 3 on GPU 7
INFO:__main__:work item 5 on GPU 7
INFO:__main__:work item 4 on GPU 5
INFO:__main__:work item 6 on GPU 3
INFO:__main__:work item 7 on GPU 7
INFO:__main__:work item 9 on GPU 7
INFO:__main__:work item 8 on GPU 5
INFO:__main__:work item 10 on GPU 3
INFO:__main__:work item 11 on GPU 7
Note: A ML model (such as with an ONNX session) can be loaded within the initializer function and stored in some global module attribute and then be called from the worker function.
Upvotes: 1
Reputation: 1193
The problem you face is that each process must store which GPU it should use. Currently, by zipping, you manually choose which argument will be processed on which GPU. This, in turn, causes some processes to wait others because they get assigned to the same GPU.
The way to fix this is assigning a GPU ID to each CPU before you use the map function. In this case, you don't manually zip the GPU IDs and let the multiprocessing module handle the scheduling.
Assigning GPU IDs to processes can be accomplished by finding the current CPU ID and then get the corresponding GPU ID using a mapping. To do this, we will use multiprocessing.current_process function to get the current process. After getting the process, we need to obtain a unique identifier we can use to map the CPU ID to GPU ID. This can be done using the Process.name attribute. If you read the documentation, it is said that if the names are not set manually, all the processes will be assigned a name of the form Process-<ID>
. Therefore, we can extract the ID from the process name and use it to get a unique GPU identifier.
import multiprocessing
# can also be a dictionary
gpu_id_list = [3, 5, 7, 10]
def function(x):
cpu_name = multiprocessing.current_process().name
cpu_id = int(cpu_name[cpu_name.find('-') + 1:]) - 1
gpu_id = gpu_id_list[cpu_id]
return x * gpu_id
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
input_list = [1, 1, 1, 1]
print(pool.map(function, input_list))
prints [3, 5, 7, 10]
meaning every process gets assigned to its own GPU identifier.
Upvotes: 5