SamStephens
SamStephens

Reputation: 5861

How do I wait when all ThreadPoolExecutor threads are busy?

My understanding of how a ThreadPoolExecutor works is that when I call #submit, tasks are assigned to threads until all available threads are busy, at which point the executor puts the tasks in a queue awaiting a thread becoming available.

The behavior I want is to block when there is not a thread available, to wait until one becomes available and then only submit my task.

The background is that my tasks are coming from a queue, and I only want to pull messages off my queue when there are threads available to work on these messages.

In an ideal world, I'd be able to provide an option to #submit to tell it to block if a thread is not available, rather than putting them in a queue.

However, that option does not exist. So what I'm looking at is something like:

    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
        while True:
            wait_for_available_thread(executor)
            message = pull_from_queue()
            executor.submit(do_work_for_message, message)

And I'm not sure of the cleanest implementation of wait_for_available_thread.

Honestly, I'm surprised this isn't actually in concurrent.futures, as I would have thought the pattern of pulling from a queue and submitting to a thread pool executor would be relatively common.

Upvotes: 4

Views: 2713

Answers (2)

Tuzki Jack
Tuzki Jack

Reputation: 41

Base on @Samwise's answer (https://stackoverflow.com/a/73396000/8388869), I have expand the ThreadPoolExecutor

import time
from concurrent.futures import Future, ThreadPoolExecutor


class AvailableThreadPoolExecutor(ThreadPoolExecutor):
    """ThreadPoolExecutor that keeps track of the number of available workers.

    Refs:
        inspired by https://stackoverflow.com/a/73396000/8388869
    """

    def __init__(
        self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()
    ):
        super().__init__(max_workers, thread_name_prefix, initializer, initargs)
        self._running_worker_futures: set[Future] = set()

    @property
    def available_workers(self) -> int:
        """the number of available workers"""
        return self._max_workers - len(self._running_worker_futures)

    def wait_for_available_worker(self, timeout: float | None = None) -> None:
        """wait until there is an available worker

        Args:
            timeout: the maximum time to wait in seconds. If None, wait indefinitely.

        Raises:
            TimeoutError: if the timeout is reached.
        """

        start_time = time.monotonic()
        while True:
            if self.available_workers > 0:
                return
            if timeout is not None and time.monotonic() - start_time > timeout:
                raise TimeoutError
            time.sleep(0.1)

    def submit(self, fn, /, *args, **kwargs):
        f = super().submit(fn, *args, **kwargs)
        self._running_worker_futures.add(f)
        f.add_done_callback(self._running_worker_futures.remove)
        return f

It should work like that:

with AvailableThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
    while True:
        executor.wait_for_available_worker()
        message = pull_from_queue()
        executor.submit(do_work_for_message, message)

Upvotes: 1

Samwise
Samwise

Reputation: 71444

One approach might be to keep track of your currently running threads via a set of Futures:

    active_threads = set()
    def pop_future(future):
        active_threads.pop(future)

    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
        while True:
            while len(active_threads) >= CONCURRENCY:
                time.sleep(0.1)  # or whatever
            message = pull_from_queue()
            future = executor.submit(do_work_for_message, message)    
            active_threads.add(future)
            future.add_done_callback(pop_future)

A more sophisticated approach might be to have the done_callback be the thing that triggers a queue pull, rather than polling and blocking, but then you need to fall back to polling the queue if the workers manage to get ahead of it.

Upvotes: 2

Related Questions