Reputation: 1801
In the example problem below, the main program creates a list of random strings of length data_size
. Without multi-processing the data is sent directly to Test.iterate()
where the class merely adds the string Test-
to the beginning of each random string. When run without multiprocessing the code works very well with small values of data_size
and large values of data_size
.
I decided to add a multiprocessing ability to this test problem and broke down the core components of multiprocessing into a class title MultiProc
. The member function Multiproc.run_processes()
manages all functions in the class. The function assumes that the input list will be divided into x smaller lists depending on how many processes the user wishes to utilize. As a result, the function starts by determining the upper and lower indices for each sub-list relative to the initial list so the code knows which portions to iterate over for each thread. The function then initiates the processes, starts the process, joins the process, extracts the data from Queue
, then it re-orders the returned data based on a counter that is passed to the primary function. The MultiProc class works fairly well at small values of data_size
, but above a value of ~500, the code never terminates, although I suspect the value will vary from computer to computer depending on memory. However, at some point the multiprocess function stops working and I suspect it has something to do with the way data is returned from multiprocess. Does anyone know what might be causing this problem and how to fix it?
from multiprocessing import Process, Queue
from itertools import chain
import string
import random
class Test:
def __init__(self, array_list):
self.array_list = array_list
def func(self, names):
return 'Test-' + names
def iterate(self, upper, lower, counter):
output = [self.func(self.array_list[i]) for i in range(lower, upper)]
return output, counter
class MultiProc:
def __init__(self, num_procs, data_array, func):
self.num_procs = num_procs
self.data_array = data_array
self.func = func
if self.num_procs > len(self.data_array):
self.num_procs = len(self.data_array)
self.length = int((len(self.data_array) / self.num_procs) // 1)
def run_processes(self):
upper = self.__determine_upper_indices()
lower = self.__determine_lower_indices(upper)
p, q = self.__initiate_proc(self.func, upper, lower)
self.__start_thread(p)
self.__join_threads(p)
results = self.__extract_data(q)
new = self.__reorder_data(results)
return new
def __determine_upper_indices(self):
upper = [i * self.length for i in range(1, self.num_procs)]
upper.append(len(self.data_array))
return upper
def __determine_lower_indices(self, upper):
lower = [upper[i] for i in range(len(upper) - 1)]
lower = [0] + lower
return lower
def __initiate_proc(self, func, upper, lower):
q = Queue()
p = [Process(target=self.run_and_send_back_output,
args=(q, func, upper[i], lower[i], i))
for i in range(self.num_procs)]
return p, q
def __start_thread(self, p):
[p[i].start() for i in range(self.num_procs)]
def __join_threads(self, p):
[p[i].join() for i in range(self.num_procs)]
def __extract_data(self, q):
results = []
while not q.empty():
results.extend(q.get())
return results
def __reorder_data(self, results):
new = [results[i - 1] for j in range(self.num_procs)
for i in range(len(results)) if results[i] == j]
new = list(chain.from_iterable(new))
return new
def run_and_send_back_output(self, queue, func, *args):
result = func(*args) # run the func
queue.put(result) # send the result back
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
if __name__ == "__main__":
random.seed(1234)
data_size = 9
num_proc = 2
test_list = [id_generator() for i in range(data_size)]
obj1 = Test(test_list)
result1 = obj1.iterate(data_size, 0, 1)
print(result1)
multi = MultiProc(num_proc, test_list, obj1.iterate)
result2 = multi.run_processes()
print(result2)
# >> ['Test-2HAFCF', 'Test-GWPBBB', 'Test-W43JFL', 'Test-HA65PE',
# 'Test-83EF6C', 'Test-R9ET4W', 'Test-RPM37B', 'Test-6EAVJ4',
# 'Test-YKDE5K']
Upvotes: 0
Views: 780
Reputation: 69042
Your main problem is this:
self.__start_thread(p)
self.__join_threads(p)
results = self.__extract_data(q)
You start your workers that try to put something in a queue, then join the workers and only after that you start retreiving data from the queue. The workers however can only exit after all data has been flushed to the underlying pipe, and will block on exit otherwise. Joining processes blocked like this before starting to retrieve elements from the pipe can result in a deadlock.
Maybe you should look into multiprocessing.Pool
, as what you're trying to implement is some kind of a map()
operation. Your example could rewritten more elegantly something like this:
from multiprocessing import Pool
import string
import random
def func(name):
return 'Test-' + name
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
if __name__ == "__main__":
random.seed(1234)
data_size = 5000
num_proc = 2
test_list = [id_generator() for i in range(data_size)]
with Pool(num_proc) as pool:
result = pool.map(func, test_list)
print(result)
Upvotes: 2