James Adams
James Adams

Reputation: 8737

Python multiprocessing: Shared memory (numpy) Array not being modified as expected

I have written a small multiprocessing program in Python which reads an array of values and runs multiple processes asynchronously to operate on parts of the data array. Each separate process should its own 1-D section of the 2-D array, with no overlap between processes. Once all the processes have completed the shared memory array is to be written out to file, but it's at this point in my code where the expected/computed values are not present in the shared memory array, but the original values are still present. It seems that the assignment of new values within the processes did not stick to the shared memory object. Perhaps there's something going on that I'm not understanding (for example pass by reference vs. pass by value) which is at play causing my trouble?

I have a Processor class which creates a number of worker processes and instantiates a JoinableQueue. The function that is called by each process operates on an indexed slice of the 2-D shared memory array and updates those array values in place, so the input (shared memory) array should have all values replaced by the results of the computation so there should be no need to have a second array for the results. The main function passes the shared memory array and an index value as arguments for the compute function, these are added into a queue from which the process objects will consume items of work. The code is below:

class Processor:
     
    queue = None
                 
    def __init__(self, 
                 number_of_workers=1):
              
        # create a joinable queue
        self.queue = JoinableQueue()
        
        # create the processes
        self.processes = [Process(target=self.compute) for _ in range(number_of_workers)]
        for p in self.processes:
            p.start()
                 
    def add_work_item(self, item):
         
        # add the parameters list to the parameters queue
        self.queue.put(item)
 
    def compute(self):
         
        while True:
              
            # get a list of arguments from the queue
            arguments = self.queue.get()
              
            # if we didn't get one we keep looping
            if arguments is None:
                break
  
            # process the arguments here
            data = arguments[0]
            index = arguments[1] 
                 
            # only process non-empty grid cells, i.e. data array contains at least some non-NaN values
            if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all():
             
                pass         
                  
            else:  # we have some valid values to work with
             
                logger.info('Processing latitude: {}'.format(index))
             
                # perform a fitting to gamma     
                results = do_something(data[:, index])
 
                # update the shared array
                data[:, index] = results

            # indicate that the task has completed
            self.queue.task_done()
 
    def terminate(self):
 
        # terminate all processes
        for p in self.processes:
            p.terminate()

    def wait_on_all(self):
 
        #wait until queue is empty
        self.queue.join()

#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':

    try:
        
        # log some timing info, used later for elapsed time 
        start_datetime = datetime.now()
        logger.info("Start time:    {}".format(start_datetime, '%x'))
        
        # get the command line arguments
        input_file = sys.argv[1]
        input_var_name = sys.argv[2]
        output_file_base = sys.argv[3]
        month_scale = int(sys.argv[4])

        # create the variable name from the indicator, distribution, and month scale
        variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2))

        # open the NetCDF files
        with netCDF4.Dataset(input_file) as input_dataset, \
            netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset:
             
            # read info from the input dataset and initialize the output for writing

            # create a processor with a number of worker processes
            number_of_workers = 1
            processor = Processor(number_of_workers)
            
            # for each longitude slice
            for lon_index in range(lon_size):
    
                logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))

                # read the longitude slice into a data array     
                longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :]
                
                # reshape into a 1-D array
                original_shape = longitude_slice.shape
                flat_longitude_slice = longitude_slice.flatten()
                
                # convert the array onto a shared memory array which can be accessed from within another process
                shared_array_base = Array(ctypes.c_double, flat_longitude_slice)
                shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
                shared_array = shared_array.reshape(original_shape)
                
                # loop over each latitude point in the longitude slice
                for lat_index in range(lat_size):
                    
                    # have the processor process the shared array at this index
                    arguments = [shared_array, lat_index]
                    processor.add_work_item(arguments)
                    
                # join to the processor and don't continue until all processes have completed
                processor.wait_on_all()
    
                                                 
                # write the fitted longitude slice values into the output NetCDF
                output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size))
    
            # all processes have completed
            processor.terminate()
        
    except Exception, e:
        logger.error('Failed to complete', exc_info=True)
        raise

Where am I going wrong, i.e. why are the values of the shared memory array not being updated as I'm expecting?

Update

I have this working now for a single process but when I try to spawn multiple processes I get a pickle error:

pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0>

This occurs when the second process is started, within the Processor.init() function. If I run the below code using a single process (number_of_workers = 1) then I do not encounter this error and my code runs as expected, although not utilizing multiple processors, which is a goal.

class Processor:
     
    queue = None
                 
    def __init__(self, 
                 shared_array,
                 data_shape,
                 number_of_workers=1):
              
        # create a joinable queue
        self.queue = JoinableQueue()
        
        # keep reference to shared memory array
        self.shared_array = shared_array
        self.data_shape = data_shape
        
        # create the processes
        self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)]
        for p in self.processes:
            p.start()
                 
    def add_work_item(self, item):
         
        # add the parameters list to the parameters queue
        self.queue.put(item)
 
    def compute_indicator(self):
         
        while True:
              
            # get a list of arguments from the queue
            arguments = self.queue.get()
              
            # if we didn't get one we keep looping
            if arguments is None:
                break
  
            # process the arguments here
            index = arguments[0] 
                 
            # turn the shared array into a numpy array     
            data = np.ctypeslib.as_array(self.shared_array)
            data = data.reshape(self.data_shape)
                
            # only process non-empty grid cells, i.e. data array contains at least some non-NaN values
            if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \
                or np.isnan(data[:, index]).all() or (data[:, index] < 0).all():
             
                pass         
                  
            else:  # we have some valid values to work with
             
                logger.info('Processing latitude: {}'.format(index))
             
                # perform computation     
                fitted_values = do_something(data[:, index])
 
                # update the shared array
                data[:, index] = fitted_values
                
            # indicate that the task has completed
            self.queue.task_done()
 
    def terminate(self):
 
        # terminate all processes
        for p in self.processes:
            p.terminate()

    def wait_on_all(self):
 
        #wait until queue is empty
        self.queue.join()

#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':

            # create a shared memory array which can be accessed from within another process
            shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False)
                
            # create a processor with a number of worker processes
            number_of_workers = 4
            data_shape = (time_size, lat_size)
            processor = Processor(shared_array_base, data_shape, number_of_workers)

            # for each longitude slice
            for lon_index in range(lon_size):
    
                logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))

                # get the shared memory array and convert into a numpy array with proper dimensions
                longitude_array = np.ctypeslib.as_array(shared_array_base)
                longitude_array = np.reshape(longitude_array, data_shape)

                # read the longitude slice into the shared memory array     
                longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :]
                
                # a list of arguments we'll map to the processes of the pool
                arguments_iterable = []
                
                # loop over each latitude point in the longitude slice
                for lat_index in range(lat_size):
                    
                    # have the processor process the shared array at this index
                    processor.add_work_item([lat_index])
                        
                # join to the processor and don't continue until all processes have completed
                processor.wait_on_all()
    
                # get the longitude slice of fitted values from the shared memory array and convert  
                # into a numpy array with proper dimensions which we can then use to write to NetCDF
                fitted_array = np.ctypeslib.as_array(shared_array_base)
                fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size))
                                                 
                # write the longitude slice of computed values into the output NetCDF
                output_dataset.variables[variable_name][:, lon_index, :] = fitted_array
    
            # all processes have completed
            processor.terminate()

Upvotes: 1

Views: 1627

Answers (1)

James Adams
James Adams

Reputation: 8737

I have now successfully implemented a solution although it is still exhibiting unexpected behaviors: 1) it runs on all CPUs on a Windows environment but the total elapsed time for the process is no faster than running a single processor job (i.e. the same code without any of the multiprocessing.* usages), and 2) when I run the code on a Linux environment (virtual container) I only see one out of four CPUs being utilized. In any event I have now a working code that utilizes a shared memory array, which is what the original question was about. If anyone can see where I'm going wrong which is leading to the two issues mentioned above then please follow up in the comments.

def compute(args):

    # extract the arguments
    lon_index = args[0]
    lat_index = args[1]

    # NOT SHOWN
    # get the data_shape value

    # turn the shared array into a numpy array
    data = np.ctypeslib.as_array(shared_array)
    data = data.reshape(data_shape)

    # perform the computation, update the indexed array slice
    data[:, lon_index, lat_index] = perform_computation(data[:, lon_index, lat_index])  

def init_process(array):

    # put the arguments to the global namespace  
    global shared_array
    shared_array = array


if __name__ == '__main__':

    # NOT SHOWN
    # get the lat_size, lon_size, time_size, lon_stride, and data_shape values

    # create a shared memory array which can be accessed from within another process
    shared_array = Array(ctypes.c_double, time_size * lon_stride * lat_size, lock=False)
    data_shape = (time_size, lon_stride, lat_size)

    # create a processor with a number of worker processes
    number_of_workers = cpu_count()

    # create a Pool, essentially forking with copies of the shared array going to each pooled/forked process
    pool = Pool(processes=number_of_workers, 
                initializer=init_process, 
                initargs=(shared_array))

    # for each slice
    for lon_index in range(0, lon_size, lon_stride):

        # convert the shared memory array into a numpy array with proper dimensions
        slice_array = np.ctypeslib.as_array(shared_array)
        slice_array = np.reshape(slice_array, data_shape)

        # read the longitude slice into the shared memory array     
        slice_array[:] = read_data(data_shape)

        # a list of arguments we'll map to the processes of the pool
        arguments_iterable = []

        # loop over each latitude point in the longitude slice
        for lat_index in range(lat_size):

            for i in range(lon_stride):

                # have the processor process the shared array at this index
                arguments = [i, lat_index]
                arguments_iterable.append(arguments)

                # map the arguments iterable to the compute function
                pool.map(compute, arguments_iterable)

                # get the longitude slice of fitted values from the shared memory array and convert  
                # into a numpy array with proper dimensions which we can then use to write to NetCDF
                fitted_array = np.ctypeslib.as_array(shared_array)
                fitted_array = np.reshape(fitted_array, (time_size, lon_stride, lat_size))

                # NOT SHOWN
                # write the slice of computed values to file

            # all processes have completed, close the pool
            pool.close()

Upvotes: 1

Related Questions