Reputation: 862
I am trying to use multiprocessing to (i) read in data, (ii) do some analyses, and (iii) save the output/results of those analyses as an instance of custom class. My function that does the analyses inputs multiple arguments, indicating that starmap
from the multiprocessing
module should do the trick.
However, even though I input unique (non repeating) arguments into my function, the results (i.e., the class instances) are sometimes duplicated and/or missing.
Here is an example of some boiled down code that illustrates my question/issue:
import numpy as np
from multiprocessing import Pool
# create simple class
class EgClass:
pass
# define function to do analysis
def fun(eg_class, val):
eg_class.val = val
return eg_class
if __name__ == "__main__":
# create a list of unique inputs
unique_vals = list(np.arange(12))
# instantiate the example class
inst = EgClass()
# create a list of inputs
inputs = list(zip(np.repeat(inst, len(unique_vals)), unique_vals))
# apply the inputs to the function via the pool
p = Pool(processes=2)
results = p.starmap(fun, inputs)
p.close()
result_vals = [i.val for i in results]
# notice the result values repeat (not unique) and do not match the unique_vals
print(result_vals)
print(unique_vals)
Interestingly, if you decrease the number of unique values (in this case changing unique_vals
to unique_vals = list(np.arange(7))
) the code works as I would expect, i.e., duplicate values only crop up when the input arguments increase above a certain length.
I've looked here: Python multiprocessing pool creating duplicate lists But I believe that post was about wanting to create duplicate lists by sharing information across processes, which is like the opposite of what I am trying to do :)
Finally, forgive my naïveté. I am new to multiprocessing and there is a good chance I am missing something obvious.
Upvotes: 1
Views: 1052
Reputation: 18614
let's first run this code serially without multiprocessing.
from itertools import starmap
results = list(starmap(fun,inputs))
# [11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11]
notice you only created one EgClass
instance in your entire code, (you only called EgClass()
once), np.repeat
simply repeats the pointer to the same class, it doesn't create new objects, so all the trials are modifying this same object, Facts and myths about Python names and values
now for the multiprocessing part, let's modify the code to print the id of the returned object instead, let's also allow the chunksize parameter of starmap
to be modified.
results = p.starmap(fun, inputs,chunksize=2)
result_id = [id(i) for i in results]
# [3066122542240, 3066122542240, 3066122541184, 3066122541184, 3066122540896, 3066122540896, 3066122541232, 3066122541232, 3066122541376, 3066122541376, 3066122483072, 3066122483072]
this means that the other process knows that the two objects that were sent over were the same object, because pickle can resolve multiple references to the same object, which means both objects with chunksize=2
were correctly sent to the other process and returned as one object instead of 2, but as only 2 objects were pickled at one time, only 2 objects could keep the same id, we can change that by changing the chunksize
parameter, for example we'd get the behavior you expect if we set chunksize
to 1, but that would be relying on pickle specific behavior, instead the proper way to ensure all objects have different id is to actually create different objects to begin with, or make deep copies of it.
inputs = list(zip([EgClass() for x in range(len(unique_vals))], unique_vals))
# result_vals = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
this actually creates different objects and each object will have its own id, and internal properties, so we won't be depending on pickle's specific implementation detail.
keep in mind that arguments and returns of multiprocessing are pickled copy versions of the original objects, and so any change to them doesn't translate to changes in the original objects, you'll have to create Manager objects to propagate the change.
Upvotes: 1