Simd
Simd

Reputation: 21343

tqdm, multiprocessing and how to print a line under the progress bar

I am using multiprocessing and tqdm to show the progress of the workers. I want to add a line under the progress bar to show which tasks are currently being processed. Unfortunately, whatever I do seems to end up with this being printed on top of the progress bar making a mess. Here is a MWE that shows the problem:

from multiprocessing import Pool, Manager, Value
import time
import os
import tqdm
import sys

class ParallelProcessor:
    def __init__(self, shared_data):
        self.shared_data = shared_data

    def process_task(self, args):
        """Worker function: Simulates task processing and updates progress"""
        lock, progress, active_tasks, index, integer_arg = args
        pid = os.getpid()
        core_id = index % len(os.sched_getaffinity(0))
        os.sched_setaffinity(pid, {core_id})

        with lock:
            active_tasks.append(f"Task {index+1}")

        time.sleep(2)  # Simulate processing time

        with lock:
            active_tasks.remove(f"Task {index+1}")
            progress.value += 1

        return self.shared_data

    def progress_updater(self, total_tasks, progress, active_tasks):
        """Update tqdm progress bar and active task list on separate lines"""
        sys.stdout.write("\n")  # Move to the next line for active task display
        sys.stdout.flush()

        with tqdm.tqdm(total=total_tasks, desc="Processing Tasks", position=0, leave=True) as pbar:
            while pbar.n < total_tasks:
                time.sleep(0.1)  # Update interval
                pbar.n = progress.value
                pbar.refresh()

                # Move cursor down to the next line and overwrite active task display
                sys.stdout.write("\033[s")  # Save cursor position
                sys.stdout.write(f"\033[2K\rActive: {', '.join(active_tasks[:5])}")  # Clear line and print active tasks
                sys.stdout.write("\033[u")  # Restore cursor position
                sys.stdout.flush()

    def run_parallel(self, tasks, num_cores=None):
        """Runs tasks in parallel with a progress bar"""
        num_cores = num_cores or len(os.sched_getaffinity(0))
        manager = Manager()
        lock = manager.Lock()
        progress = manager.Value("i", 0)  # Shared integer for progress tracking
        active_tasks = manager.list()  # Shared list for active tasks

        # Start progress updater in the main process
        from threading import Thread
        progress_thread = Thread(target=self.progress_updater, args=(len(tasks), progress, active_tasks))
        progress_thread.start()

        # Prepare task arguments
        task_args = [(lock, progress, active_tasks, idx, val) for idx, val in enumerate(tasks)]

        # Run parallel tasks
        with Pool(num_cores) as pool:
            results = pool.map(self.process_task, task_args)

        # Ensure progress bar finishes
        progress_thread.join()
        print("\n")  # Move to the next line after processing

        return results


if __name__ == "__main__":
    processor = ParallelProcessor(shared_data=10)
    processor.run_parallel(tasks=range(40), num_cores=4)

Upvotes: 2

Views: 81

Answers (1)

ken
ken

Reputation: 3211

You can add a separate bar at the bottom that displays only tasks.

def progress_updater(self, total_tasks, progress, active_tasks):
    """Update tqdm progress bar and active task list on separate lines"""
    sys.stdout.write("\n")  # Move to the next line for active task display
    sys.stdout.flush()

    with (
        tqdm.tqdm(total=total_tasks, desc="Processing Tasks", position=0, leave=True) as pbar,
        tqdm.tqdm(bar_format="{desc}", position=1, leave=False) as task_bar,
    ):
        while pbar.n < total_tasks:
            time.sleep(0.1)  # Update interval
            pbar.n = progress.value
            pbar.refresh()

            # # Move cursor down to the next line and overwrite active task display
            # sys.stdout.write("\033[s")  # Save cursor position
            # sys.stdout.write(f"\033[2K\rActive: {', '.join(active_tasks[:5])}")  # Clear line and print active tasks
            # sys.stdout.write("\033[u")  # Restore cursor position
            # sys.stdout.flush()
            task_bar.set_description_str(f"Active: {', '.join(active_tasks[:5])}")
Processing Tasks:  20%|█████                    | 8/40 [00:05<00:21,  1.47it/s]
Active: Task 3, Task 6, Task 9, Task 12

Upvotes: 3

Related Questions