SenYan
SenYan

Reputation: 23

python multiprocessing when share a numpy array

I want to change the value in a large numpy array partially by leveraging multiprocessing.

That is to say, I want to get [[100, 100, 100], [100, 100, 100]] in the end.

However the following code is wrong and it says "RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance"

What should I do? Thanks.

import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def change_array(array, i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = Array('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3)

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (X, i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))

Upvotes: 2

Views: 4432

Answers (2)

Booboo
Booboo

Reputation: 44313

You need to make two changes:

  1. Use a multiprocessing.Array instance with locking (actually, the default) rather than a "plain" Array.
  2. Do not pass the array instance as an argument to your worker function. Instead you should initialize each processor in your pool with the array as a global value.
import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def initpool(arr):
    global array
    array = arr

def change_array(i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))

Prints:

[100.    2.2   3.3   4.4   5.5   6.6]
[100.  100.    3.3   4.4   5.5   6.6]
[100.  100.  100.    4.4   5.5   6.6]
[100.  100.  100.  100.    5.5   6.6]
[100.  100.  100.  100.  100.    6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
 [100. 100. 100.]]

Update

Since in this case the values being changed in the data array do not depend on the existing values in that array, there is no need for function change_array to have access to the array and it can instead, as suggested by Frank Yellin, just return a tuple of the indices to be changed with the new value. But I did want to show you how you would pass the array for those situations where the function did need to access/modify the array. The following code, in this instance, however, is all that you need (I have made a few simplifications):

import numpy as np
import multiprocessing


def change_array(i, j):
    return i, j, 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    with multiprocessing.Pool(processes=3) as pool:
        result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
        for r in result:
            i, j, value = r.get()
            data[i, j] = value
        print(data)

Or:

import numpy as np
import multiprocessing
import itertools


def change_array(t):
    i, j = t
    return i, j, 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    with multiprocessing.Pool(processes=3) as pool:
        for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
            data[i, j] = value
        print(data)

Upvotes: 7

Frank Yellin
Frank Yellin

Reputation: 11330

The most important rule of multiprocessing. You do not want to be modifying a shared object in your subprocesses if at all possible. You want your worker program to be:

def change_array(i, j):
    value = ..... whatever value goes here
    return i, j, value

Your main process would then read the values i,j,value that are returned and set the element of the array to the right value.

Upvotes: 2

Related Questions