user1578720
user1578720

Reputation: 17

Using Python's concurrent.futures to process objects in parallel

I just started using the library concurrent.futures from Python 3 to apply to a list of images a number of functions, in order to process these images and reshape them. The functions are resize(height, width) and opacity(number).

On the other hand, I have the images() function that yield file-like objects, so I tried this code to process my images in parallel:

import concurrent.futures
From mainfile import images
From mainfile import shape


def parallel_image_processing :
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future = executor.submit(images)
    for fileobject in future.result() :
        future1 = executor.submit( shape.resize, fileobject, "65","85")
        future2 = executor.submit( shape.opacity, fileobject, "0.5")

Could somebody tell if I am on the right path to accomplish this?

Upvotes: 0

Views: 3365

Answers (1)

dano
dano

Reputation: 94981

I would recommend making images just return a path, rather than an open file object:

def images():
    ...
    yield os.path.join(image_dir[0], filename)

And then using this:

from functools import partial

def open_and_call(func, filename, args=(), kwargs={}):
    with open(filename, 'rb') as f:
        return func(f, *args, **kwargs)

def parallel_image_processing():
    resize_func = partial(open_and_call, shape.resize, args=("65", "85"))
    opacity_func = partial(open_and_call, shape.opacity, args=("0.5"))
    img_list = list(images())
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures1 = executor.map(resize_func, img_list)
        futures2 = executor.map(opacity_func, img_list)

        concurrent.futures.wait([futures1, futures2])


if __name__ == "__main__":
    # Make sure the entry point to the function that creates the executor 
    # is inside an `if __name__ == "__main__"` guard if you're on Windows.
    parallel_image_processing()

If you're using CPython (as opposed to an alternative implementation that doesn't have a GIL, like Jython) you don't want to use ThreadPoolExecutor, because image processing is CPU-intensive; due to the GIL, only one thread can run at a time in CPython, so you won't actually do anything in parallel if you use threads for your use-case. Instead, use ProcessPoolExecutor, which will use processes instead of threads, avoiding the GIL altogether. Note that this is why I recommended not returning file-like objects from images - you can't pass an open file handle to the worker processes. You have to open the files in the workers instead.

To do this, we have our executor call a little shim function (open_and_call), which will open the file in the worker process, and then call the resize/opacity functions with the correct arguments.

I'm also using executor.map instead of executor.submit, so that we can call resize/opacity for every item returned by images() without an explicit for loop. I use functools.partial to make it easier to call a function taking multiple arguments with executor.map (which only allows you to call functions that take a single argument).

There's also no need to call images() in the executor, since you're going to wait for its results before continuing anyway. Just call it like a normal function. I convert the generator object returned by images() to a list prior to calling map, as well. If you're concerned about memory usage, you can call images() directly in each map call, but if not, it's probably faster to just call images() once and store it as a list.

Upvotes: 3

Related Questions