MrExplore
MrExplore

Reputation: 57

Python multiprocessing progress approach

I've been busy writing my first multiprocessing code and it works, yay. However, now I would like some feedback of the progress and I'm not sure what the best approach would be.

What my code (see below) does in short:

What I'm looking for could be:

  1. Simple
  1. Fancy
Core 0  processing file 20 of 317 ||||||____ 60% completed
Core 1  processing file 21 of 317 |||||||||_ 90% completed
...
Core 7  processing file 18 of 317 ||________ 20% completed

I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?

Thanks in advance!

EDIT: Changed my code that starts the processes as suggested by gsb22

My code:

# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2


# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\\videofiles"

def get_videofiles(target_directory):

    # Find all video files in directory and subdirectories and put them in a list
    videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
    # Return the list
    return videofiles


def process_file(videofile):

    '''
    What happens inside this function:
    - The video is processed and analysed using openCV
    - The result (an image) is saved to the results folder
    - Once this function receives the videofile it completes
      without the need to return anything to the main program
    '''

    # The processing code is more complex than this code below, this is just a test
    cap = cv2.VideoCapture(videofile)

    for i in range(10):
        succes, frame = cap.read()

        # cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)

        if succes:
            try:
                cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
            except:
                print('something went wrong')


if __name__ == "__main__":

    # Create directory to save results if it doesn't exist
    if not os.path.exists(targetDirectory + '/_Results'):
        os.makedirs(targetDirectory + '/_Results')

    # Get a list of all video files in the target directory
    all_files = get_videofiles(targetDirectory)

    print(f'{len(all_files)} video files found')

    # Create list of jobs (processes)
    jobs = []

    # Create and start processes
    for file in all_files:
        proc = Process(target=process_file, args=(file,))
        jobs.append(proc)

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

    # TODO: Print some form of progress feedback

    print('Finished :)')

Upvotes: 5

Views: 392

Answers (1)

2e0byo
2e0byo

Reputation: 5954

I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?

Here's a very simple way to get progress indication at minimal cost:

from multiprocessing.pool import Pool
from random import randint
from time import sleep

from tqdm import tqdm


def process(fn) -> bool:
    sleep(randint(1, 3))
    return randint(0, 100) < 70


files = [f"file-{i}.mp4" for i in range(20)]

success = []
failed = []
NPROC = 5
pool = Pool(NPROC)


for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
    if status:
        success.append(fn)
    else:
        failed.append(fn)

print(f"{len(success)} succeeded and {len(failed)} failed")

Some comments:

  • tqdm is a 3rd-party library which implements progressbars extremely well. There are others. pip install tqdm.
  • we use a pool (there's almost never a reason to manage processes yourself for simple things like this) of NPROC processes. We let the pool handle iterating our process function over the input data.
  • we signal state by having the function return a boolean (in this example we choose randomly, weighting in favour of success). We don't return the filename, although we could, because it would have to be serialised and sent from the subprocess, and that's unnecessary overhead.
  • we use Pool.imap, which returns an iterator which keeps the same order as the iterable we pass in. So we can use zip to iterate files directly. Since we use an iterator with unknown size, tqdm needs to be told how long it is. (We could have used pool.map, but there's no need to commit the ram---although for one bool it probably makes no difference.)

I've deliberately written this as a kind of recipe. You can do a lot with multiprocessing just by using the high-level drop in paradigms, and Pool.[i]map is one of the most useful.

References

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool https://tqdm.github.io/

Upvotes: 1

Related Questions