Rayamon
Rayamon

Reputation: 344

Execute method in each object in list in parallel

I have a list of objects, and I want to execute a method in each object in parallel. The method modifies the attributes of the objects. For example:

class Object:
    def __init__(self, a):
        self.a = a
    def aplus(self):
        self.a += 1

object_list = [Object(1), Object(2), Object(3)]

# I want to execute this in parallel
for i in range(len(object_list)):
    object_list[i].aplus() 

I tried the following:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

executor = ProcessPoolExecutor(max_workers=3)
res = executor.map([obj.aplus for obj in object_list])

Which does not work, leaving the objects unaltered. I assume it's because the objects can only be copied, and not accessed, with multiprocessing. Any idea?

Thanks a lot!

EDIT: Supposedly the objects are very big, so it would be preferable to avoid copying them to each process. The methods are also supposedly very CPU intensive, so multiple processes rather than threads should be used. Within these conditions, I believe there is no solution, as multiprocessing cannot share memory and threads cannot use multiple CPUs. I would like to be shown wrong though.

Upvotes: 5

Views: 1697

Answers (3)

Rob Bricheno
Rob Bricheno

Reputation: 4653

I assume it's because the objects can only be copied, and not accessed, with multiprocessing.

This is exactly right, and is half the answer. Because the processes are isolated they each have their own copy of the object_list. One solution here is to use ThreadPoolExecutor (the threads all share the same object_list).

The syntax to use it is a bit different from what you are trying to use, but this works as intended:

executor = ThreadPoolExecutor(max_workers=3)
res = executor.map(Object.aplus, object_list)

If you really want to use ProcessPoolExecutor then you'll need to get the data back from the processes somehow. The easiest way is to use functions which return values:

from concurrent.futures import ProcessPoolExecutor


class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1
        return self.a


if __name__ == '__main__':

    object_list = [Object(1), Object(2), Object(3)]

    executor = ProcessPoolExecutor(max_workers=3)
    for result in executor.map(Object.aplus, object_list):
        print("I got: " + str(result))

You can even have the function you are maping return self, and put those returned objects back into your object_list at then end. So the full multiprocessing solution would look like:

from concurrent.futures import ProcessPoolExecutor


class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1
        return self


if __name__ == '__main__':

    object_list = [Object(1), Object(2), Object(3)]

    executor = ProcessPoolExecutor(max_workers=3)
    object_list = list(executor.map(Object.aplus, object_list))

Upvotes: 1

Cyphall
Cyphall

Reputation: 368

Here is my answer, using threading:

from threading import Thread

class Object:
    def __init__(self, a):
        self.a = a
    def aplus(self):
        self.a += 1

object_list = [Object(1), Object(2), Object(3)]

# A list containing all threads we will create
threads = []

# Create a thread for every objects
for obj in object_list:
    thread = Thread(target=obj.aplus)
    thread.daemon = True
    thread.start()
    threads.append(thread)

# Wait for all threads to finish before continuing
for thread in threads:
    thread.join();

# prints results
for obj in object_list:
    print(obj.a)

Upvotes: 2

Rafael
Rafael

Reputation: 7242

Here is a working example using Pool.map:

import multiprocessing

class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1

    def __str__(self):
        return str(self.a)

def worker(obj):
    obj.aplus()
    return obj

if __name__ == "__main__":
    object_list = [Object(1), Object(2), Object(3)]

    try:
        processes = multiprocessing.cpu_count()
    except NotImplementedError:
        processes = 2

    pool = multiprocessing.Pool(processes=processes)
    modified_object_list = pool.map(worker, object_list)

    for obj in modified_object_list:
        print(obj)

Prints:

2
3
4

Upvotes: 2

Related Questions