James
James

Reputation: 45

Achieving parallelism with multiprocessing on batches of data

I have data that I want to preform a batched process on using two GPUs in parallel, so I wrote the following worker class:

class Worker(multiprocessing.Process):
    def __init__(self, gpu_id: int, res_queue: multiprocessing.Queue(), class_func: GPUFanc):
        super().__init__()
        self.gpu_id = gpu_id
        self.func = class_func
        self.res_queue = res_queue

    def run_func(self):
        with cp.cuda.Device(self.gpu_id):
            self.res_queue.put(self.class_func.run_func(self.batch))

   def alloc_batch(self, batch):
       with cp.cuda.Device(self.gpu_id):
           self.batch = cp.asarray(batch)

So I'm putting the results into a queue to deal with later, and the way that I initialize and run those workers is:

queue_list = [multiprocessing.Queue() for i in range(num_gpus)]
workers = []
for i in range(num_gpus):
    workers.append(Worker(gpu_ids[i], queue_list[i], func_class_object)

for worker in workers:
    worker.start()

for num_batch in range(num_batches):
    main_batch = get_batch(num_batch)  # Gives some arbitrary batch of data
    
    divided_batches = devide_batch(main_batch, num_gpus)
    
    for worker_ind in range(len(workers)):
        workers[worker_ind].alloc_batch(divided_batches[worker_ind])
    
    for worker in workers:
        worker.run_func()

# ------ Take out all the batches results from queues after the for loop ------

and then I'm calling worker.join().

My questions is does the two processes run in parallel when I'm calling the "worker.run_func()" inside the for loop or the call waits for the run_func to finish in every iteration.

If it isn't in parallel what changes I need to make to achieve it.

Also when looking with "nvidia-smi -l 1" I can see that the GPUs work one after the other according to the utilization section.

Upvotes: 0

Views: 55

Answers (0)

Related Questions