Fabian Bosler
Fabian Bosler

Reputation: 2510

Is it possible to set maxtasksperchild for a threadpool?

After encountering some probable memory leaks in a long running multi threaded script I found out about maxtasksperchild, which can be used in a Multi process pool like this:

import multiprocessing

with multiprocessing.Pool(processes=32, maxtasksperchild=x) as pool:
    pool.imap(function,stuff)

Is something similar possible for the Threadpool (multiprocessing.pool.ThreadPool)?

Upvotes: 8

Views: 676

Answers (3)

Nick Falco
Nick Falco

Reputation: 207

I wanted a ThreadPool that will run a new task as soon as another task in the pool completes (i.e. maxtasksperchild=1). I decided to write a small "ThreadPool" class that creates a new thread for every task. As soon a task in the pool completes, another thread is created for the next value in the iterable passed to the map method. The map method blocks until all values in the passed iterable have been processed and their threads returned.

import threading


class ThreadPool():

    def __init__(self, processes=20):
        self.processes = processes
        self.threads = [Thread() for _ in range(0, processes)]

    def get_dead_threads(self):
        dead = []
        for thread in self.threads:
            if not thread.is_alive():
                dead.append(thread)
        return dead

    def is_thread_running(self):
        return len(self.get_dead_threads()) < self.processes

    def map(self, func, values):
        attempted_count = 0
        values_iter = iter(values)
        # loop until all values have been attempted to be processed and
        # all threads are finished running
        while (attempted_count < len(values) or self.is_thread_running()):
            for thread in self.get_dead_threads():
                try:
                    # run thread with the next value
                    value = next(values_iter)
                    attempted_count += 1
                    thread.run(func, value)
                except StopIteration:
                    break

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        pass


class Thread():

    def __init__(self):
        self.thread = None

    def run(self, target, *args, **kwargs):
        self.thread = threading.Thread(target=target,
                                       args=args,
                                       kwargs=kwargs)
        self.thread.start()

    def is_alive(self):
        if self.thread:
            return self.thread.is_alive()
        else:
            return False

You can use it like this:

def run_job(self, value, mp_queue=None):
    # do something with value
    value += 1


with ThreadPool(processes=2) as pool:
    pool.map(run_job, [1, 2, 3, 4, 5])

Upvotes: 0

ASHu2
ASHu2

Reputation: 2047

As the answer by noxdafox said, there is no way in the parent class, you can use threading module to control the max number of tasks per child. As you want to use multiprocessing.pool.ThreadPool, threading module is similar, so...

def split_processing(yourlist, num_splits=4):
    '''
    yourlist = list which you want to pass to function for threading.
    num_splits = control total units passed.
    '''
    split_size = len(yourlist) // num_splits
    threads = []
    for i in range(num_splits):
        start = i * split_size
        end = len(yourlist) if i+1 == num_splits else (i+1) * split_size
        threads.append(threading.Thread(target=function, args=(yourlist, start, end)))
        threads[-1].start()

    # wait for all threads to finish
    for t in threads:
        t.join()

Lets say yourlist has 100 items, then

if num_splits = 10; then threads = 10, each thread has 10 tasks.
if num_splits = 5; then threads = 5, each thread has 20 tasks.
if num_splits = 50; then threads = 50, each thread has 2 tasks.
and vice versa.

Upvotes: 3

noxdafox
noxdafox

Reputation: 15040

Looking at multiprocessing.pool.ThreadPool implementation it becomes evident that the maxtaskperchild parameter is not propagated to the parent multiprocessing.Pool class. The multiprocessing.pool.ThreadPool implementation has never been completed, hence it lacks few features (as well as tests and documentation).

The pebble package implements a ThreadPool which supports workers restart after a given amount of tasks have been processed.

Upvotes: 0

Related Questions