Reputation: 4033
I have a class Processor
, that takes in some input data (which we are going to call examples), processes the input data, and outputs the results. At a high level it looks like this:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_examples), total=len(all_examples)):
all_results.append(result)
return all_results
def process_single_example(self, example):
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
The idea is that the processor is initialized once (loading the model takes a good amount of time) and can take advantage of a multicore machine to process the input examples. The above doesn't work, since class methods are non pickle-able for multiprocessing. After consulting the following StackOverflow posts:
call multiprocessing in class method Python
Multiprocessing: How to use Pool.map on a function defined in a class?
Multiprocessing: How to use Pool.map on a function defined in a class?
I came up with the following solution:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
However, this didn't work. If I try to run process_all_examples
it will get stuck at .imap_unordered
. For testing purposes, I tried it using some dummy data/processing to understand what was happening, but rather than getting stuck, the multiprocessing was just super slow:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = [i for i in range(1000)]
def process_all_examples_multi(self, all_examples, nproc=4):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(nproc)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
def process_all_examples_single(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
for _input in tqdm(all_inputs):
all_results.append(self.process_single_example(_input))
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
result = self.arg1 * self.arg2 * self.model[3] * example
return result
processor = Processor(-1, 2)
all_examples = list(range(100000))
results = processor.process_all_examples_multi(all_examples) # slower
results = processor.process_all_examples_single(all_examples) # faster
Adding a chunksize
parameter (with a value between 100 and 10000) to .imap_unordered
seems to significantly increase performance, but it never surpasses that of just using a single core without multiprocessin.Pool
.
I know there are alternatives, one being redesign the way my code is structured, the other being use globals, but I can't shake the feeling that I'm just missing something here. I've also tried using the pathos.multiprocessing
module from the pathos
library to no avail.
Upvotes: 0
Views: 645
Reputation: 77337
With multiprocessing you have to worry about the payload passed from parent to child verses the work done. Since you are using a forking operating system, parent and child share the same memory at the point the pool is created. But you haven't really leveraged that because you pass self
and its data (your model) to the child for processing for every work item.
You can setup some global state that the workers know about and put the data there. Anything big goes in the global state and the only thing passed by the pool is an index into the current data for that worker. Adding chunksize
reduces the communication overhead so is good to add when you have a lot of work items and they all take relatively similar amounts of time to calculate.
There is overhead to the multiprocessing - its not worthwhile if individual worker calculations are small. In this example I boosted the work done with an extra for loop, assuming your actual work is pretty large. But if it isn't, the pool really won't help.
from tqdm import tqdm
import multiprocessing
import threading
# will hold (Processor, example set) for process_all_examples_multi
_process_this = None
_process_this_lock = threading.Lock()
class Processor:
def __init__(self, arg1, arg2):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = [i for i in range(1000)]
def process_all_examples_multi(self, all_examples, nproc=4):
# setup memory state for processing pool
with _process_this_lock:
global _process_this
_process_this = (self, all_examples)
# context manager deletes pool when done
with multiprocessing.Pool(nproc) as pool:
all_results = list(tqdm(pool.imap_unordered(
self.process_single_example_2,range(len(all_examples)), chunksize=100),
total=len(all_examples)))
return all_results
def process_all_examples_single(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
for _input in tqdm(all_inputs):
all_results.append(self.process_single_example(_input))
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
result = self.arg1 * self.arg2 * self.model[3] * example
# lets simulate more work
for i in range(10000):
pass
return result
@staticmethod
def process_single_example_2(example_index):
processor, example = _process_this
result = processor.arg1 * processor.arg2 * processor.model[3] * example[example_index]
# lets simulate more work
for i in range(10000):
pass
return result
processor = Processor(-1, 2)
all_examples = list(range(100000))
results = processor.process_all_examples_multi(all_examples)
# vs
results = processor.process_all_examples_single(all_examples)
Upvotes: 1