Reputation: 8737
I have written a small multiprocessing program in Python which reads an array of values and runs multiple processes asynchronously to operate on parts of the data array. Each separate process should its own 1-D section of the 2-D array, with no overlap between processes. Once all the processes have completed the shared memory array is to be written out to file, but it's at this point in my code where the expected/computed values are not present in the shared memory array, but the original values are still present. It seems that the assignment of new values within the processes did not stick to the shared memory object. Perhaps there's something going on that I'm not understanding (for example pass by reference vs. pass by value) which is at play causing my trouble?
I have a Processor class which creates a number of worker processes and instantiates a JoinableQueue. The function that is called by each process operates on an indexed slice of the 2-D shared memory array and updates those array values in place, so the input (shared memory) array should have all values replaced by the results of the computation so there should be no need to have a second array for the results. The main function passes the shared memory array and an index value as arguments for the compute function, these are added into a queue from which the process objects will consume items of work. The code is below:
class Processor:
queue = None
def __init__(self,
number_of_workers=1):
# create a joinable queue
self.queue = JoinableQueue()
# create the processes
self.processes = [Process(target=self.compute) for _ in range(number_of_workers)]
for p in self.processes:
p.start()
def add_work_item(self, item):
# add the parameters list to the parameters queue
self.queue.put(item)
def compute(self):
while True:
# get a list of arguments from the queue
arguments = self.queue.get()
# if we didn't get one we keep looping
if arguments is None:
break
# process the arguments here
data = arguments[0]
index = arguments[1]
# only process non-empty grid cells, i.e. data array contains at least some non-NaN values
if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all():
pass
else: # we have some valid values to work with
logger.info('Processing latitude: {}'.format(index))
# perform a fitting to gamma
results = do_something(data[:, index])
# update the shared array
data[:, index] = results
# indicate that the task has completed
self.queue.task_done()
def terminate(self):
# terminate all processes
for p in self.processes:
p.terminate()
def wait_on_all(self):
#wait until queue is empty
self.queue.join()
#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
try:
# log some timing info, used later for elapsed time
start_datetime = datetime.now()
logger.info("Start time: {}".format(start_datetime, '%x'))
# get the command line arguments
input_file = sys.argv[1]
input_var_name = sys.argv[2]
output_file_base = sys.argv[3]
month_scale = int(sys.argv[4])
# create the variable name from the indicator, distribution, and month scale
variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2))
# open the NetCDF files
with netCDF4.Dataset(input_file) as input_dataset, \
netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset:
# read info from the input dataset and initialize the output for writing
# create a processor with a number of worker processes
number_of_workers = 1
processor = Processor(number_of_workers)
# for each longitude slice
for lon_index in range(lon_size):
logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))
# read the longitude slice into a data array
longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :]
# reshape into a 1-D array
original_shape = longitude_slice.shape
flat_longitude_slice = longitude_slice.flatten()
# convert the array onto a shared memory array which can be accessed from within another process
shared_array_base = Array(ctypes.c_double, flat_longitude_slice)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(original_shape)
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
# have the processor process the shared array at this index
arguments = [shared_array, lat_index]
processor.add_work_item(arguments)
# join to the processor and don't continue until all processes have completed
processor.wait_on_all()
# write the fitted longitude slice values into the output NetCDF
output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size))
# all processes have completed
processor.terminate()
except Exception, e:
logger.error('Failed to complete', exc_info=True)
raise
Where am I going wrong, i.e. why are the values of the shared memory array not being updated as I'm expecting?
I have this working now for a single process but when I try to spawn multiple processes I get a pickle error:
pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0>
This occurs when the second process is started, within the Processor.init() function. If I run the below code using a single process (number_of_workers = 1) then I do not encounter this error and my code runs as expected, although not utilizing multiple processors, which is a goal.
class Processor:
queue = None
def __init__(self,
shared_array,
data_shape,
number_of_workers=1):
# create a joinable queue
self.queue = JoinableQueue()
# keep reference to shared memory array
self.shared_array = shared_array
self.data_shape = data_shape
# create the processes
self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)]
for p in self.processes:
p.start()
def add_work_item(self, item):
# add the parameters list to the parameters queue
self.queue.put(item)
def compute_indicator(self):
while True:
# get a list of arguments from the queue
arguments = self.queue.get()
# if we didn't get one we keep looping
if arguments is None:
break
# process the arguments here
index = arguments[0]
# turn the shared array into a numpy array
data = np.ctypeslib.as_array(self.shared_array)
data = data.reshape(self.data_shape)
# only process non-empty grid cells, i.e. data array contains at least some non-NaN values
if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \
or np.isnan(data[:, index]).all() or (data[:, index] < 0).all():
pass
else: # we have some valid values to work with
logger.info('Processing latitude: {}'.format(index))
# perform computation
fitted_values = do_something(data[:, index])
# update the shared array
data[:, index] = fitted_values
# indicate that the task has completed
self.queue.task_done()
def terminate(self):
# terminate all processes
for p in self.processes:
p.terminate()
def wait_on_all(self):
#wait until queue is empty
self.queue.join()
#-----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
# create a shared memory array which can be accessed from within another process
shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False)
# create a processor with a number of worker processes
number_of_workers = 4
data_shape = (time_size, lat_size)
processor = Processor(shared_array_base, data_shape, number_of_workers)
# for each longitude slice
for lon_index in range(lon_size):
logger.info('\n\nProcessing longitude: {}\n'.format(lon_index))
# get the shared memory array and convert into a numpy array with proper dimensions
longitude_array = np.ctypeslib.as_array(shared_array_base)
longitude_array = np.reshape(longitude_array, data_shape)
# read the longitude slice into the shared memory array
longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :]
# a list of arguments we'll map to the processes of the pool
arguments_iterable = []
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
# have the processor process the shared array at this index
processor.add_work_item([lat_index])
# join to the processor and don't continue until all processes have completed
processor.wait_on_all()
# get the longitude slice of fitted values from the shared memory array and convert
# into a numpy array with proper dimensions which we can then use to write to NetCDF
fitted_array = np.ctypeslib.as_array(shared_array_base)
fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size))
# write the longitude slice of computed values into the output NetCDF
output_dataset.variables[variable_name][:, lon_index, :] = fitted_array
# all processes have completed
processor.terminate()
Upvotes: 1
Views: 1627
Reputation: 8737
I have now successfully implemented a solution although it is still exhibiting unexpected behaviors: 1) it runs on all CPUs on a Windows environment but the total elapsed time for the process is no faster than running a single processor job (i.e. the same code without any of the multiprocessing.* usages), and 2) when I run the code on a Linux environment (virtual container) I only see one out of four CPUs being utilized. In any event I have now a working code that utilizes a shared memory array, which is what the original question was about. If anyone can see where I'm going wrong which is leading to the two issues mentioned above then please follow up in the comments.
def compute(args):
# extract the arguments
lon_index = args[0]
lat_index = args[1]
# NOT SHOWN
# get the data_shape value
# turn the shared array into a numpy array
data = np.ctypeslib.as_array(shared_array)
data = data.reshape(data_shape)
# perform the computation, update the indexed array slice
data[:, lon_index, lat_index] = perform_computation(data[:, lon_index, lat_index])
def init_process(array):
# put the arguments to the global namespace
global shared_array
shared_array = array
if __name__ == '__main__':
# NOT SHOWN
# get the lat_size, lon_size, time_size, lon_stride, and data_shape values
# create a shared memory array which can be accessed from within another process
shared_array = Array(ctypes.c_double, time_size * lon_stride * lat_size, lock=False)
data_shape = (time_size, lon_stride, lat_size)
# create a processor with a number of worker processes
number_of_workers = cpu_count()
# create a Pool, essentially forking with copies of the shared array going to each pooled/forked process
pool = Pool(processes=number_of_workers,
initializer=init_process,
initargs=(shared_array))
# for each slice
for lon_index in range(0, lon_size, lon_stride):
# convert the shared memory array into a numpy array with proper dimensions
slice_array = np.ctypeslib.as_array(shared_array)
slice_array = np.reshape(slice_array, data_shape)
# read the longitude slice into the shared memory array
slice_array[:] = read_data(data_shape)
# a list of arguments we'll map to the processes of the pool
arguments_iterable = []
# loop over each latitude point in the longitude slice
for lat_index in range(lat_size):
for i in range(lon_stride):
# have the processor process the shared array at this index
arguments = [i, lat_index]
arguments_iterable.append(arguments)
# map the arguments iterable to the compute function
pool.map(compute, arguments_iterable)
# get the longitude slice of fitted values from the shared memory array and convert
# into a numpy array with proper dimensions which we can then use to write to NetCDF
fitted_array = np.ctypeslib.as_array(shared_array)
fitted_array = np.reshape(fitted_array, (time_size, lon_stride, lat_size))
# NOT SHOWN
# write the slice of computed values to file
# all processes have completed, close the pool
pool.close()
Upvotes: 1