Reputation: 21343
I am using multiprocessing and tqdm to show the progress of the workers. I want to add a line under the progress bar to show which tasks are currently being processed. Unfortunately, whatever I do seems to end up with this being printed on top of the progress bar making a mess. Here is a MWE that shows the problem:
from multiprocessing import Pool, Manager, Value
import time
import os
import tqdm
import sys
class ParallelProcessor:
def __init__(self, shared_data):
self.shared_data = shared_data
def process_task(self, args):
"""Worker function: Simulates task processing and updates progress"""
lock, progress, active_tasks, index, integer_arg = args
pid = os.getpid()
core_id = index % len(os.sched_getaffinity(0))
os.sched_setaffinity(pid, {core_id})
with lock:
active_tasks.append(f"Task {index+1}")
time.sleep(2) # Simulate processing time
with lock:
active_tasks.remove(f"Task {index+1}")
progress.value += 1
return self.shared_data
def progress_updater(self, total_tasks, progress, active_tasks):
"""Update tqdm progress bar and active task list on separate lines"""
sys.stdout.write("\n") # Move to the next line for active task display
sys.stdout.flush()
with tqdm.tqdm(total=total_tasks, desc="Processing Tasks", position=0, leave=True) as pbar:
while pbar.n < total_tasks:
time.sleep(0.1) # Update interval
pbar.n = progress.value
pbar.refresh()
# Move cursor down to the next line and overwrite active task display
sys.stdout.write("\033[s") # Save cursor position
sys.stdout.write(f"\033[2K\rActive: {', '.join(active_tasks[:5])}") # Clear line and print active tasks
sys.stdout.write("\033[u") # Restore cursor position
sys.stdout.flush()
def run_parallel(self, tasks, num_cores=None):
"""Runs tasks in parallel with a progress bar"""
num_cores = num_cores or len(os.sched_getaffinity(0))
manager = Manager()
lock = manager.Lock()
progress = manager.Value("i", 0) # Shared integer for progress tracking
active_tasks = manager.list() # Shared list for active tasks
# Start progress updater in the main process
from threading import Thread
progress_thread = Thread(target=self.progress_updater, args=(len(tasks), progress, active_tasks))
progress_thread.start()
# Prepare task arguments
task_args = [(lock, progress, active_tasks, idx, val) for idx, val in enumerate(tasks)]
# Run parallel tasks
with Pool(num_cores) as pool:
results = pool.map(self.process_task, task_args)
# Ensure progress bar finishes
progress_thread.join()
print("\n") # Move to the next line after processing
return results
if __name__ == "__main__":
processor = ParallelProcessor(shared_data=10)
processor.run_parallel(tasks=range(40), num_cores=4)
Upvotes: 2
Views: 81
Reputation: 3211
You can add a separate bar at the bottom that displays only tasks.
def progress_updater(self, total_tasks, progress, active_tasks):
"""Update tqdm progress bar and active task list on separate lines"""
sys.stdout.write("\n") # Move to the next line for active task display
sys.stdout.flush()
with (
tqdm.tqdm(total=total_tasks, desc="Processing Tasks", position=0, leave=True) as pbar,
tqdm.tqdm(bar_format="{desc}", position=1, leave=False) as task_bar,
):
while pbar.n < total_tasks:
time.sleep(0.1) # Update interval
pbar.n = progress.value
pbar.refresh()
# # Move cursor down to the next line and overwrite active task display
# sys.stdout.write("\033[s") # Save cursor position
# sys.stdout.write(f"\033[2K\rActive: {', '.join(active_tasks[:5])}") # Clear line and print active tasks
# sys.stdout.write("\033[u") # Restore cursor position
# sys.stdout.flush()
task_bar.set_description_str(f"Active: {', '.join(active_tasks[:5])}")
Processing Tasks: 20%|█████ | 8/40 [00:05<00:21, 1.47it/s]
Active: Task 3, Task 6, Task 9, Task 12
Upvotes: 3