Vespa
Vespa

Reputation: 52

Multiprocessing.Process do not run process in parallel

I tried to run a very simple multiprocessing code, but the code is still serially processed.

I have tried to run it on Mac(macOS 10.13) and Linux(Ubuntu 18.04) with python 2 and 3, but in both environments I had the same problem.

the function _process has to receive numpy array as arguments, so I decided to use Multiprocess.Process instead of Multiprocess.Pool.map() and Multiprocess.Pool.apply_async() because pickle is broken when use pool.map() in a class. https://stackoverflow.com/a/21345308/4755986

import time
from multiprocessing import Process, Queue
import numpy as np

class model:

   def __init__(self):
       self.results = []
       self.jobs = []
       self.start = time.time()

   def _process(self, x,y,z):
       j= 0
       for i in range(10**8):
           j = i+j
       return j

   def work(self,X,Y,Z, result_queue):
        start = time.time() -self.start
        result = self._process(X,Y,Z)
        result_queue.put(result)
        print(result)
        end = time.time()   -self.start
        print( 'start time: ', start)
        print('end time:', end)
#        return result_queue

   def fit(self,num):
       for i in range(num):
           X, Y, Z =  np.ones([5,5]), np.ones([3,3]), np.ones([2,2])
           result_queue = Queue()
           p = Process(target=self.work, args = (X,Y,Z, result_queue))
           self.jobs.append(p)
           p.start()
           print( 'ChildProcess...',i)
           result = result_queue.get()
           self.results.append(result)

       for p in self.jobs:
           p.join()
           p.close()

       return self.results


R = model()
k = R.fit(10)
print(k)

The time of start and end of each process is printed, and the second process only starts after the first process is finished. This is strange because each process should be automatically assign to different core and run in parallel.

Upvotes: 0

Views: 438

Answers (1)

Demindiro
Demindiro

Reputation: 334

result = result_queue.get()

result_queue.get() will block if it is empty. An item will only be added when a process finishes, hence the next process will be spawned only if the previous has finished.

Below is a version that does spawn 10 processes at once. I've marked the section I've added:

import time
from multiprocessing import Process, Queue
import numpy as np

class model:

   def __init__(self):
       self.results = []
       self.jobs = []
       self.start = time.time()

   def _process(self, x,y,z):
       j= 0
       for i in range(10**8):
           j = i+j
       return j

   def work(self,X,Y,Z, result_queue):
        start = time.time() -self.start
        result = self._process(X,Y,Z)
        result_queue.put(result)
        print(result)
        end = time.time()   -self.start
        print( 'start time: ', start)
        print('end time:', end)
#        return result_queue

   def fit(self,num):
       for i in range(num):
           X, Y, Z =  np.ones([5,5]), np.ones([3,3]), np.ones([2,2])
           result_queue = Queue()
           p = Process(target=self.work, args = (X,Y,Z, result_queue))
           self.jobs.append(p)
           p.start()
           print( 'ChildProcess...',i)
           #result = result_queue.get()   # <--- This blocks
           #self.results.append(result)

       for p in self.jobs:
           p.join()
           p.close()

       for result in result_queue:        # <-----
           self.results.append(result)    # <-----

       return self.results


R = model()
k = R.fit(10)
print(k)

Upvotes: 2

Related Questions