Jay Mody
Jay Mody

Reputation: 4033

multiprocessing within classes

I have a class Processor, that takes in some input data (which we are going to call examples), processes the input data, and outputs the results. At a high level it looks like this:

from tqdm import tqdm
import multiprocessing

class Processor:
    def __init__(self, arg1, arg2, model_path):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = load_model_from_path(model_path)

    def process_all_examples(self, all_examples):
        all_results = []
        pool = multiprocessing.Pool(4)
        for result in tqdm(pool.imap_unordered(self.process_single_example, all_examples), total=len(all_examples)):
            all_results.append(result)
        return all_results

    def process_single_example(self, example):
        # do some complicated calculations on the example that use
        # self.arg1, self.arg2, and self.model
        return result

The idea is that the processor is initialized once (loading the model takes a good amount of time) and can take advantage of a multicore machine to process the input examples. The above doesn't work, since class methods are non pickle-able for multiprocessing. After consulting the following StackOverflow posts:

call multiprocessing in class method Python

Multiprocessing: How to use Pool.map on a function defined in a class?

Multiprocessing: How to use Pool.map on a function defined in a class?

I came up with the following solution:

from tqdm import tqdm
import multiprocessing

class Processor:
    def __init__(self, arg1, arg2, model_path):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = load_model_from_path(model_path)

    def process_all_examples(self, all_examples):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        pool = multiprocessing.Pool(4)
        for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
            all_results.append(result)
        return all_results

    @staticmethod
    def process_single_example(inputs):
        self, example = inputs
        # do some complicated calculations on the example that use
        # self.arg1, self.arg2, and self.model
        return result

However, this didn't work. If I try to run process_all_examples it will get stuck at .imap_unordered. For testing purposes, I tried it using some dummy data/processing to understand what was happening, but rather than getting stuck, the multiprocessing was just super slow:

from tqdm import tqdm
import multiprocessing

class Processor:
    def __init__(self, arg1, arg2):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = [i for i in range(1000)]

    def process_all_examples_multi(self, all_examples, nproc=4):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        pool = multiprocessing.Pool(nproc)
        for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
            all_results.append(result)
        return all_results

    def process_all_examples_single(self, all_examples):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        for _input in tqdm(all_inputs):
            all_results.append(self.process_single_example(_input))
        return all_results

    @staticmethod
    def process_single_example(inputs):
        self, example = inputs
        result = self.arg1 * self.arg2 * self.model[3] * example
        return result
processor = Processor(-1, 2)
all_examples = list(range(100000))

results = processor.process_all_examples_multi(all_examples) # slower
results = processor.process_all_examples_single(all_examples) # faster

Adding a chunksize parameter (with a value between 100 and 10000) to .imap_unordered seems to significantly increase performance, but it never surpasses that of just using a single core without multiprocessin.Pool.

I know there are alternatives, one being redesign the way my code is structured, the other being use globals, but I can't shake the feeling that I'm just missing something here. I've also tried using the pathos.multiprocessing module from the pathos library to no avail.

Upvotes: 0

Views: 645

Answers (1)

tdelaney
tdelaney

Reputation: 77337

With multiprocessing you have to worry about the payload passed from parent to child verses the work done. Since you are using a forking operating system, parent and child share the same memory at the point the pool is created. But you haven't really leveraged that because you pass self and its data (your model) to the child for processing for every work item.

You can setup some global state that the workers know about and put the data there. Anything big goes in the global state and the only thing passed by the pool is an index into the current data for that worker. Adding chunksize reduces the communication overhead so is good to add when you have a lot of work items and they all take relatively similar amounts of time to calculate.

There is overhead to the multiprocessing - its not worthwhile if individual worker calculations are small. In this example I boosted the work done with an extra for loop, assuming your actual work is pretty large. But if it isn't, the pool really won't help.

from tqdm import tqdm
import multiprocessing
import threading

# will hold (Processor, example set) for process_all_examples_multi
_process_this = None
_process_this_lock = threading.Lock()

class Processor:
    def __init__(self, arg1, arg2):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = [i for i in range(1000)]

    def process_all_examples_multi(self, all_examples, nproc=4):
        # setup memory state for processing pool
        with _process_this_lock:
            global _process_this
            _process_this = (self, all_examples)
            # context manager deletes pool when done
            with multiprocessing.Pool(nproc) as pool:
                all_results = list(tqdm(pool.imap_unordered(
                    self.process_single_example_2,range(len(all_examples)), chunksize=100), 
                    total=len(all_examples)))
            return all_results

    def process_all_examples_single(self, all_examples):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        for _input in tqdm(all_inputs):
            all_results.append(self.process_single_example(_input))
        return all_results

    @staticmethod
    def process_single_example(inputs):
        self, example = inputs
        result = self.arg1 * self.arg2 * self.model[3] * example
        # lets simulate more work
        for i in range(10000):
            pass
        return result

    @staticmethod
    def process_single_example_2(example_index):
        processor, example = _process_this
        result = processor.arg1 * processor.arg2 * processor.model[3] * example[example_index]
        # lets simulate more work
        for i in range(10000):
            pass
        return result

processor = Processor(-1, 2)
all_examples = list(range(100000))

results = processor.process_all_examples_multi(all_examples)
# vs
results = processor.process_all_examples_single(all_examples)

Upvotes: 1

Related Questions