Jon
Jon

Reputation: 1801

Python multiprocessing example never terminates when the dataset is too large

In the example problem below, the main program creates a list of random strings of length data_size. Without multi-processing the data is sent directly to Test.iterate() where the class merely adds the string Test- to the beginning of each random string. When run without multiprocessing the code works very well with small values of data_size and large values of data_size.

I decided to add a multiprocessing ability to this test problem and broke down the core components of multiprocessing into a class title MultiProc. The member function Multiproc.run_processes() manages all functions in the class. The function assumes that the input list will be divided into x smaller lists depending on how many processes the user wishes to utilize. As a result, the function starts by determining the upper and lower indices for each sub-list relative to the initial list so the code knows which portions to iterate over for each thread. The function then initiates the processes, starts the process, joins the process, extracts the data from Queue, then it re-orders the returned data based on a counter that is passed to the primary function. The MultiProc class works fairly well at small values of data_size, but above a value of ~500, the code never terminates, although I suspect the value will vary from computer to computer depending on memory. However, at some point the multiprocess function stops working and I suspect it has something to do with the way data is returned from multiprocess. Does anyone know what might be causing this problem and how to fix it?

from multiprocessing import Process, Queue
from itertools import chain
import string
import random


class Test:
    def __init__(self, array_list):
        self.array_list = array_list

    def func(self, names):
        return 'Test-' + names

    def iterate(self, upper, lower, counter):
        output = [self.func(self.array_list[i]) for i in range(lower, upper)]
        return output, counter


class MultiProc:
    def __init__(self, num_procs, data_array, func):
        self.num_procs = num_procs
        self.data_array = data_array
        self.func = func
        if self.num_procs > len(self.data_array):
            self.num_procs = len(self.data_array)
        self.length = int((len(self.data_array) / self.num_procs) // 1)

    def run_processes(self):
        upper = self.__determine_upper_indices()
        lower = self.__determine_lower_indices(upper)
        p, q = self.__initiate_proc(self.func, upper, lower)
        self.__start_thread(p)
        self.__join_threads(p)
        results = self.__extract_data(q)
        new = self.__reorder_data(results)
        return new

    def __determine_upper_indices(self):
        upper = [i * self.length for i in range(1, self.num_procs)]
        upper.append(len(self.data_array))
        return upper

    def __determine_lower_indices(self, upper):
        lower = [upper[i] for i in range(len(upper) - 1)]
        lower = [0] + lower
        return lower

    def __initiate_proc(self, func, upper, lower):
        q = Queue()
        p = [Process(target=self.run_and_send_back_output,
                     args=(q, func, upper[i], lower[i], i))
                     for i in range(self.num_procs)]
        return p, q

    def __start_thread(self, p):
        [p[i].start() for i in range(self.num_procs)]

    def __join_threads(self, p):
        [p[i].join() for i in range(self.num_procs)]

    def __extract_data(self, q):
        results = []
        while not q.empty():
            results.extend(q.get())
        return results

    def __reorder_data(self, results):
        new = [results[i - 1] for j in range(self.num_procs)
               for i in range(len(results)) if results[i] == j]
        new = list(chain.from_iterable(new))
        return new

    def run_and_send_back_output(self, queue, func, *args):
        result = func(*args)  # run the func
        queue.put(result)    # send the result back

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

if __name__ == "__main__":
    random.seed(1234)
    data_size = 9
    num_proc = 2
    test_list = [id_generator() for i in range(data_size)]
    obj1 = Test(test_list)
    result1 = obj1.iterate(data_size, 0, 1)
    print(result1)
    multi = MultiProc(num_proc, test_list, obj1.iterate)
    result2 = multi.run_processes()
    print(result2)
    # >> ['Test-2HAFCF', 'Test-GWPBBB', 'Test-W43JFL', 'Test-HA65PE',
    #     'Test-83EF6C', 'Test-R9ET4W', 'Test-RPM37B', 'Test-6EAVJ4',
    #     'Test-YKDE5K']

Upvotes: 0

Views: 780

Answers (1)

mata
mata

Reputation: 69042

Your main problem is this:

    self.__start_thread(p)
    self.__join_threads(p)
    results = self.__extract_data(q)

You start your workers that try to put something in a queue, then join the workers and only after that you start retreiving data from the queue. The workers however can only exit after all data has been flushed to the underlying pipe, and will block on exit otherwise. Joining processes blocked like this before starting to retrieve elements from the pipe can result in a deadlock.

Maybe you should look into multiprocessing.Pool, as what you're trying to implement is some kind of a map() operation. Your example could rewritten more elegantly something like this:

from multiprocessing import Pool
import string
import random


def func(name):
    return 'Test-' + name

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

if __name__ == "__main__":
    random.seed(1234)
    data_size = 5000
    num_proc = 2
    test_list = [id_generator() for i in range(data_size)]
    with Pool(num_proc) as pool:
        result = pool.map(func, test_list)
    print(result)

Upvotes: 2

Related Questions