tomasyany
tomasyany

Reputation: 1242

Shared arrays in multiprocessing Python

I'm trying to write in the same shared array in a parallel processing python script.

When I do it outside a class, in a normal script, everything works right. But when I try to do it through a class (using the same code), I get the
Runtime Error: SynchronizedArray objects should only be shared between processes through inheritance.

My script is the following (without a class):

import numpy
import ctypes

from multiprocessing import Pool, Array, cpu_count

n = 2

total_costs_matrix_base = Array(ctypes.c_double, n*n)
total_costs_matrix = numpy.ctypeslib.as_array(
                     total_costs_matrix_base.get_obj())
total_costs_matrix = total_costs_matrix.reshape(n,n)


def set_total_costs_matrix( i, j, def_param = total_costs_matrix_base):
    total_costs_matrix[i,j] = i * j

if __name__ == "__main__":

    pool = Pool(processes=cpu_count())
    iterable = []

    for i in range(n):
        for j in range(i+1,n):
            iterable.append((i,j))
    pool.starmap(set_total_costs_matrix, iterable)
    total_costs_matrix.dump('some/path/to/file')

That script works well. The one that doesn't is the following (which uses a class):

import numpy
import ctypes

from multiprocessing import Pool, Array, cpu_count

class CostComputation(object):
    """Computes the cost matrix."""

    def __init__(self):
        self.n = 2

        self.total_costs_matrix_base = Array(ctypes.c_double, self.n*self.n)
        self.total_costs_matrix = numpy.ctypeslib.as_array(
                             self.total_costs_matrix_base.get_obj())
        self.total_costs_matrix = self.total_costs_matrix.reshape(self.n,self.n)


    def set_total_costs_matrix(self, i, j, def_param = None):
        def_param = self.total_costs_matrix_base
        self.total_costs_matrix[i,j] = i * j


    def write_cost_matrix(self):
        pool = Pool(processes=cpu_count())
        iterable = []

        for i in range(self.n):
            for j in range(i+1,self.n):
                iterable.append((i,j))
        pool.starmap(self.set_total_costs_matrix, iterable)
        self.total_costs_matrix.dump('some/path/to/file')

After this, I would call write_cost_matrix from another file, after creating an instance of CostComputation.

I read this answer but still couldn't solve my problem.

I'm using Python 3.4.2 in a Mac OSX Yosemite 10.10.4.

EDIT
When using the class CostComputation, the script I'm using is:

from cost_computation import CostComputation

cc = CostComputation()
cc.write_costs_matrix()

The whole error is:

Traceback (most recent call last):
  File "app.py", line 65, in <module>
    cc.write_cost_matrix()
  File "/path/to/cost_computation.py", line 75, in write_cost_matrix
    pool.starmap(self.set_total_costs_matrix, iterable)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 268, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/sharedctypes.py", line 192, in __reduce__
    assert_spawning(self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

Upvotes: 3

Views: 1884

Answers (2)

Booboo
Booboo

Reputation: 44108

As an aside, Array(ctypes.c_double, n*n) creates a synchronized array that has an underlying multiprocessing.Lock instance that can be obtained by calling get_lock() on the array instance. This should be used if you need to implement operations on the array as a critical section. In your case, this is unnecessary and you could instead initialize self.total_costs_matrix_base with the more efficient Array(ctypes.c_double, n*n, lock=False).

Unfortunately, if you are using a multiprocessing pool and a multiprocessing.Array as a class attribute, you will get the exception you experienced. A solution is to use a multiprocessing.shared_memory.SharedMemory instance:

import numpy as np

from multiprocessing import Pool, cpu_count, shared_memory, current_process

class SharedNumpyArray:
    def __init__(self, arr):
        self._shape = arr.shape
        self._dtype = arr.dtype
        self._shm = None
        self._creator = current_process().pid

        # Initialize shared memory
        self._acquired_shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
        self._name = self._acquired_shm.name
        _arr = np.ndarray(shape=self._shape, dtype=self._dtype, buffer=self._acquired_shm.buf)
        _arr[:] = arr[:]

    def __getstate__(self):
        # If pickle is being used to serialize this instance to another process,
        # then we do not need to include attribute _acquired_shm.

        if '_acquired_shm' in self.__dict__:
            state = self.__dict__.copy()
            del state['_acquired_shm']
        else:
            state = self.__dict__
        return state

    @property
    def arr(self):
        self._shm = shared_memory.SharedMemory(name=self._name)
        return np.ndarray(shape=self._shape, dtype=self._dtype, buffer=self._shm.buf)

    def __del__(self):
        if self._shm:
            self._shm.close()

    def close_and_unlink(self):
        """Called only by the process that created this instance."""

        if current_process().pid != self._creator:
            raise RuntimeError('Only the creating process may call close_and_unlink')

        if self._shm:
            self._shm.close()
            # Prevent __del__ from trying to close _shm again
            self._shm = None

        if self._acquired_shm:
            self._acquired_shm.close()
            self._acquired_shm.unlink()
            # Make additional call to this method a no-op:
            self._acquired_shm = None

class CostComputation(object):
    """Computes the cost matrix."""

    def __init__(self):
        self.n = 4
        shape = (self.n, self.n)
        dtype = np.float64
        arr = np.zeros(shape, dtype=dtype)
        self._shared_array = SharedNumpyArray(arr=arr)

    def set_total_costs_matrix(self, i, j):
        # Create the shared numpy array from shared memory:
        total_costs_matrix = self._shared_array.arr

        total_costs_matrix[i, j] = i * j

    def write_cost_matrix(self):
        pool = Pool(processes=cpu_count())
        iterable = []

        for i in range(self.n):
            for j in range(self.n):
                iterable.append((i,j))
        pool.starmap(self.set_total_costs_matrix, iterable)

        # Create the shared numpy array from shared memory:
        arr = self._shared_array.arr
        print(arr)
        # We are through with shared memory:
        self._shared_array.close_and_unlink()

if __name__ == '__main__':
    cc = CostComputation()
    cc.write_cost_matrix()

Upvotes: 0

ATOzTOA
ATOzTOA

Reputation: 35950

Try creating a second class which contains the shared data only. Then use that class object in your main class.

Upvotes: 0

Related Questions