Reputation: 344
I have a list of objects, and I want to execute a method in each object in parallel. The method modifies the attributes of the objects. For example:
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
object_list = [Object(1), Object(2), Object(3)]
# I want to execute this in parallel
for i in range(len(object_list)):
object_list[i].aplus()
I tried the following:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
executor = ProcessPoolExecutor(max_workers=3)
res = executor.map([obj.aplus for obj in object_list])
Which does not work, leaving the objects unaltered. I assume it's because the objects can only be copied, and not accessed, with multiprocessing. Any idea?
Thanks a lot!
EDIT: Supposedly the objects are very big, so it would be preferable to avoid copying them to each process. The methods are also supposedly very CPU intensive, so multiple processes rather than threads should be used. Within these conditions, I believe there is no solution, as multiprocessing cannot share memory and threads cannot use multiple CPUs. I would like to be shown wrong though.
Upvotes: 5
Views: 1697
Reputation: 4653
I assume it's because the objects can only be copied, and not accessed, with multiprocessing.
This is exactly right, and is half the answer. Because the processes are isolated they each have their own copy of the object_list
. One solution here is to use ThreadPoolExecutor
(the threads all share the same object_list
).
The syntax to use it is a bit different from what you are trying to use, but this works as intended:
executor = ThreadPoolExecutor(max_workers=3)
res = executor.map(Object.aplus, object_list)
If you really want to use ProcessPoolExecutor
then you'll need to get the data back from the processes somehow. The easiest way is to use functions which return values:
from concurrent.futures import ProcessPoolExecutor
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
return self.a
if __name__ == '__main__':
object_list = [Object(1), Object(2), Object(3)]
executor = ProcessPoolExecutor(max_workers=3)
for result in executor.map(Object.aplus, object_list):
print("I got: " + str(result))
You can even have the function you are map
ing return self
, and put those returned objects back into your object_list
at then end. So the full multiprocessing solution would look like:
from concurrent.futures import ProcessPoolExecutor
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
return self
if __name__ == '__main__':
object_list = [Object(1), Object(2), Object(3)]
executor = ProcessPoolExecutor(max_workers=3)
object_list = list(executor.map(Object.aplus, object_list))
Upvotes: 1
Reputation: 368
Here is my answer, using threading
:
from threading import Thread
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
object_list = [Object(1), Object(2), Object(3)]
# A list containing all threads we will create
threads = []
# Create a thread for every objects
for obj in object_list:
thread = Thread(target=obj.aplus)
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for all threads to finish before continuing
for thread in threads:
thread.join();
# prints results
for obj in object_list:
print(obj.a)
Upvotes: 2
Reputation: 7242
Here is a working example using Pool.map
:
import multiprocessing
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
def __str__(self):
return str(self.a)
def worker(obj):
obj.aplus()
return obj
if __name__ == "__main__":
object_list = [Object(1), Object(2), Object(3)]
try:
processes = multiprocessing.cpu_count()
except NotImplementedError:
processes = 2
pool = multiprocessing.Pool(processes=processes)
modified_object_list = pool.map(worker, object_list)
for obj in modified_object_list:
print(obj)
Prints:
2
3
4
Upvotes: 2