rkdnl rkdnl
rkdnl rkdnl

Reputation: 33

How to keep the same number of threads on Python all the time?

This is a part of my code.

For example, even if 3 out of 10 ends first, I want the next 3 to start right away and always keep 10 threads running.

However, the current code is moving on to the next 10 only when all 10 are completely finished.

How can I modify the code?

I always want to keep a certain number of threads, but as it stands, I have to finish the previous 10 threads completely before moving on to the next 10.

import concurrent.futures
import time

def example_task(n):
    print(f"Task {n} started.")
    time.sleep(n)
    print(f"Task {n} completed.")
    return n

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = []

    for i in range(10):
        futures.append(executor.submit(example_task, i+1))

    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            print(f"Result of task: {result}")
        
            next_task = len(futures) + 1
                futures.append(executor.submit(example_task, next_task))
        except Exception as e:
            print(f"Error: {e}")

Upvotes: 2

Views: 36

Answers (1)

Alessio
Alessio

Reputation: 148

Use a queue-based approach with ThreadPoolExecutor, where tasks are continuously submitted as soon as one completes.

import concurrent.futures
import time
import itertools

def example_task(n):
    print(f"Task {n} started.")
    time.sleep(n)  # Simulate work
    print(f"Task {n} completed.")
    return n

def main():
    max_threads = 5
    total_tasks = 20  # Total number of tasks you want to process
    task_counter = itertools.count(1)  # Infinite counter for task numbers

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
        futures = {}

        # Submit initial batch of tasks
        for _ in range(max_threads):
            task_id = next(task_counter)
            futures[executor.submit(example_task, task_id)] = task_id

        # Process tasks dynamically
        while futures:
            done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)

            for future in done:
                task_id = futures.pop(future)  # Remove completed task
                try:
                    result = future.result()
                    print(f"Result of task {result}")

                    # Submit a new task if we haven't reached the total task limit
                    if task_id < total_tasks:
                        new_task_id = next(task_counter)
                        futures[executor.submit(example_task, new_task_id)] = new_task_id
                    else:
                        print("FINISHING")
                        break

                except Exception as e:
                    print(f"Task {task_id} failed: {e}")

if __name__ == "__main__":
    main()


Upvotes: 2

Related Questions