bask0
bask0

Reputation: 360

Python Multiprocessing - Grab free GPU

I run a function in parallel and each worker has access to one GPU with id 0 or 1.

def f(GPU_id, arg):
    # Select GPU to use.
    os.environ["CUDA_VISIBLE_DEVICES"]=str(GPU_id)
    # Do something with arg.

Let's say I want to evaluate arg=[1, 2, 3, 4].

from multiprocessing import Pool

p = Pool(2)
for arg in [[1, 2], [3, 4]]:
    # Call the function in parallel on GPU 0 and 1
    p.starmap(f, zip([0, 1], arg))

But now, I would like to run it asynchronously (not sure if this is the right term), meaning that the workers do not wait until the others have finished their task before they go to the next task. Therefore, the worker needs to keep the GPU_id and take the next argument from the list.

Any ideas how to do this?

Upvotes: 4

Views: 4126

Answers (2)

Bohumir Zamecnik
Bohumir Zamecnik

Reputation: 2815

The answer by @eozd is correct, but can be improved.

  • handle worker process numbering higher than zero (multiple Pool invocations)
  • setting CUDA_VISIBLE_DEVICES env variable in the initializer function

This will uniformly distribute worker multiple worker processes among a given number of GPUs. Multiple workers can share a GPU if they fit within its memory.

import logging
import multiprocessing
import os

# logging just to get not mangled outputs
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

def get_process_index(process) -> int:
    proc_name = multiprocessing.current_process().name
    # eg. "ForkPoolWorker-10", can be numbered not from zero upon multiple Pool invocations,
    # be the numbering should be contiguous
    return int(proc_name.split("-")[1])

def initialize(gpus: list[str]):
    if gpus:
        proc_index = get_process_index(multiprocessing.current_process())
        selected_gpu = gpus[proc_index % len(gpus)]
        os.environ["CUDA_VISIBLE_DEVICES"] = str(selected_gpu)
        logger.info(f"process id: {proc_index} -> GPU id: {selected_gpu}")

def work(i):
    time.sleep(0.1)
    logger.info(f"work item {i} on GPU {os.environ['CUDA_VISIBLE_DEVICES']}")

available_gpu_ids = [3, 5, 7]
with multiprocessing.Pool(processes=4, initializer=initialize, initargs=(available_gpu_ids,)) as pool:
    pool.map(work, range(12))

Example output:

INFO:__main__:process id: 17 -> GPU id: 7
INFO:__main__:process id: 18 -> GPU id: 3
INFO:__main__:process id: 19 -> GPU id: 5
INFO:__main__:process id: 20 -> GPU id: 7
INFO:__main__:work item 2 on GPU 5
INFO:__main__:work item 0 on GPU 7
INFO:__main__:work item 1 on GPU 3
INFO:__main__:work item 3 on GPU 7
INFO:__main__:work item 5 on GPU 7
INFO:__main__:work item 4 on GPU 5
INFO:__main__:work item 6 on GPU 3
INFO:__main__:work item 7 on GPU 7
INFO:__main__:work item 9 on GPU 7
INFO:__main__:work item 8 on GPU 5
INFO:__main__:work item 10 on GPU 3
INFO:__main__:work item 11 on GPU 7

Note: A ML model (such as with an ONNX session) can be loaded within the initializer function and stored in some global module attribute and then be called from the worker function.

Upvotes: 1

eozd
eozd

Reputation: 1193

The problem you face is that each process must store which GPU it should use. Currently, by zipping, you manually choose which argument will be processed on which GPU. This, in turn, causes some processes to wait others because they get assigned to the same GPU.

The way to fix this is assigning a GPU ID to each CPU before you use the map function. In this case, you don't manually zip the GPU IDs and let the multiprocessing module handle the scheduling.

Assigning GPU IDs to processes can be accomplished by finding the current CPU ID and then get the corresponding GPU ID using a mapping. To do this, we will use multiprocessing.current_process function to get the current process. After getting the process, we need to obtain a unique identifier we can use to map the CPU ID to GPU ID. This can be done using the Process.name attribute. If you read the documentation, it is said that if the names are not set manually, all the processes will be assigned a name of the form Process-<ID>. Therefore, we can extract the ID from the process name and use it to get a unique GPU identifier.

Working Example

import multiprocessing

# can also be a dictionary
gpu_id_list = [3, 5, 7, 10]


def function(x):
    cpu_name = multiprocessing.current_process().name
    cpu_id = int(cpu_name[cpu_name.find('-') + 1:]) - 1
    gpu_id = gpu_id_list[cpu_id]

    return x * gpu_id


if __name__ == '__main__':
    pool = multiprocessing.Pool(4)
    input_list = [1, 1, 1, 1]
    print(pool.map(function, input_list))

prints [3, 5, 7, 10] meaning every process gets assigned to its own GPU identifier.

Upvotes: 5

Related Questions