Reputation: 5861
My understanding of how a ThreadPoolExecutor
works is that when I call #submit
, tasks are assigned to threads until all available threads are busy, at which point the executor puts the tasks in a queue awaiting a thread becoming available.
The behavior I want is to block when there is not a thread available, to wait until one becomes available and then only submit my task.
The background is that my tasks are coming from a queue, and I only want to pull messages off my queue when there are threads available to work on these messages.
In an ideal world, I'd be able to provide an option to #submit
to tell it to block if a thread is not available, rather than putting them in a queue.
However, that option does not exist. So what I'm looking at is something like:
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
wait_for_available_thread(executor)
message = pull_from_queue()
executor.submit(do_work_for_message, message)
And I'm not sure of the cleanest implementation of wait_for_available_thread
.
Honestly, I'm surprised this isn't actually in concurrent.futures
, as I would have thought the pattern of pulling from a queue and submitting to a thread pool executor would be relatively common.
Upvotes: 4
Views: 2713
Reputation: 41
Base on @Samwise's answer (https://stackoverflow.com/a/73396000/8388869), I have expand the ThreadPoolExecutor
import time
from concurrent.futures import Future, ThreadPoolExecutor
class AvailableThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor that keeps track of the number of available workers.
Refs:
inspired by https://stackoverflow.com/a/73396000/8388869
"""
def __init__(
self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()
):
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
self._running_worker_futures: set[Future] = set()
@property
def available_workers(self) -> int:
"""the number of available workers"""
return self._max_workers - len(self._running_worker_futures)
def wait_for_available_worker(self, timeout: float | None = None) -> None:
"""wait until there is an available worker
Args:
timeout: the maximum time to wait in seconds. If None, wait indefinitely.
Raises:
TimeoutError: if the timeout is reached.
"""
start_time = time.monotonic()
while True:
if self.available_workers > 0:
return
if timeout is not None and time.monotonic() - start_time > timeout:
raise TimeoutError
time.sleep(0.1)
def submit(self, fn, /, *args, **kwargs):
f = super().submit(fn, *args, **kwargs)
self._running_worker_futures.add(f)
f.add_done_callback(self._running_worker_futures.remove)
return f
It should work like that:
with AvailableThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
executor.wait_for_available_worker()
message = pull_from_queue()
executor.submit(do_work_for_message, message)
Upvotes: 1
Reputation: 71444
One approach might be to keep track of your currently running threads via a set
of Futures:
active_threads = set()
def pop_future(future):
active_threads.pop(future)
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
while len(active_threads) >= CONCURRENCY:
time.sleep(0.1) # or whatever
message = pull_from_queue()
future = executor.submit(do_work_for_message, message)
active_threads.add(future)
future.add_done_callback(pop_future)
A more sophisticated approach might be to have the done_callback
be the thing that triggers a queue pull, rather than polling and blocking, but then you need to fall back to polling the queue if the workers manage to get ahead of it.
Upvotes: 2