Reputation: 23
I want to change the value in a large numpy array partially by leveraging multiprocessing.
That is to say, I want to get [[100, 100, 100], [100, 100, 100]] in the end.
However the following code is wrong and it says "RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance"
What should I do? Thanks.
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def change_array(array, i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = Array('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3)
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (X, i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
Upvotes: 2
Views: 4432
Reputation: 44313
You need to make two changes:
multiprocessing.Array
instance with locking (actually, the default) rather than a "plain" Array
.import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def initpool(arr):
global array
array = arr
def change_array(i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
Prints:
[100. 2.2 3.3 4.4 5.5 6.6]
[100. 100. 3.3 4.4 5.5 6.6]
[100. 100. 100. 4.4 5.5 6.6]
[100. 100. 100. 100. 5.5 6.6]
[100. 100. 100. 100. 100. 6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
[100. 100. 100.]]
Update
Since in this case the values being changed in the data
array do not depend on the existing values in that array, there is no need for function change_array
to have access to the array and it can instead, as suggested by Frank Yellin, just return a tuple of the indices to be changed with the new value. But I did want to show you how you would pass the array for those situations where the function did need to access/modify the array. The following code, in this instance, however, is all that you need (I have made a few simplifications):
import numpy as np
import multiprocessing
def change_array(i, j):
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
for r in result:
i, j, value = r.get()
data[i, j] = value
print(data)
Or:
import numpy as np
import multiprocessing
import itertools
def change_array(t):
i, j = t
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
data[i, j] = value
print(data)
Upvotes: 7
Reputation: 11330
The most important rule of multiprocessing. You do not want to be modifying a shared object in your subprocesses if at all possible. You want your worker program to be:
def change_array(i, j):
value = ..... whatever value goes here
return i, j, value
Your main process would then read the values i,j,value
that are returned and set the element of the array to the right value.
Upvotes: 2