Reputation: 2510
After encountering some probable memory leaks in a long running multi threaded script I found out about maxtasksperchild
, which can be used in a Multi process pool like this:
import multiprocessing
with multiprocessing.Pool(processes=32, maxtasksperchild=x) as pool:
pool.imap(function,stuff)
Is something similar possible for the Threadpool (multiprocessing.pool.ThreadPool
)?
Upvotes: 8
Views: 676
Reputation: 207
I wanted a ThreadPool that will run a new task as soon as another task in the pool completes (i.e. maxtasksperchild=1
). I decided to write a small "ThreadPool" class that creates a new thread for every task. As soon a task in the pool completes, another thread is created for the next value in the iterable passed to the map
method. The map
method blocks until all values in the passed iterable have been processed and their threads returned.
import threading
class ThreadPool():
def __init__(self, processes=20):
self.processes = processes
self.threads = [Thread() for _ in range(0, processes)]
def get_dead_threads(self):
dead = []
for thread in self.threads:
if not thread.is_alive():
dead.append(thread)
return dead
def is_thread_running(self):
return len(self.get_dead_threads()) < self.processes
def map(self, func, values):
attempted_count = 0
values_iter = iter(values)
# loop until all values have been attempted to be processed and
# all threads are finished running
while (attempted_count < len(values) or self.is_thread_running()):
for thread in self.get_dead_threads():
try:
# run thread with the next value
value = next(values_iter)
attempted_count += 1
thread.run(func, value)
except StopIteration:
break
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
pass
class Thread():
def __init__(self):
self.thread = None
def run(self, target, *args, **kwargs):
self.thread = threading.Thread(target=target,
args=args,
kwargs=kwargs)
self.thread.start()
def is_alive(self):
if self.thread:
return self.thread.is_alive()
else:
return False
You can use it like this:
def run_job(self, value, mp_queue=None):
# do something with value
value += 1
with ThreadPool(processes=2) as pool:
pool.map(run_job, [1, 2, 3, 4, 5])
Upvotes: 0
Reputation: 2047
As the answer by noxdafox said, there is no way in the parent class, you can use threading
module to control the max number of tasks per child. As you want to use multiprocessing.pool.ThreadPool
, threading
module is similar, so...
def split_processing(yourlist, num_splits=4):
'''
yourlist = list which you want to pass to function for threading.
num_splits = control total units passed.
'''
split_size = len(yourlist) // num_splits
threads = []
for i in range(num_splits):
start = i * split_size
end = len(yourlist) if i+1 == num_splits else (i+1) * split_size
threads.append(threading.Thread(target=function, args=(yourlist, start, end)))
threads[-1].start()
# wait for all threads to finish
for t in threads:
t.join()
Lets say yourlist has 100 items, then
if num_splits = 10; then threads = 10, each thread has 10 tasks.
if num_splits = 5; then threads = 5, each thread has 20 tasks.
if num_splits = 50; then threads = 50, each thread has 2 tasks.
and vice versa.
Upvotes: 3
Reputation: 15040
Looking at multiprocessing.pool.ThreadPool
implementation it becomes evident that the maxtaskperchild
parameter is not propagated to the parent multiprocessing.Pool
class. The multiprocessing.pool.ThreadPool
implementation has never been completed, hence it lacks few features (as well as tests and documentation).
The pebble package implements a ThreadPool
which supports workers restart after a given amount of tasks have been processed.
Upvotes: 0