Reputation: 317
I'm trying to use dask.array.map_blocks
to process a dask array, using a second dask array with different shape as an argument. The use case is firstly running some peak finding on a 2-D stack of images (4-dimensions), which is returned as a 2-D dask array of np.objects. Ergo, the two first dimensions of the two dask arrays are the same. The peaks are then used to extract intensities from the 4-dimensional dataset. In the code below, I've omitted the peak finding part. Dask version 1.0.0.
import numpy as np
import dask.array as da
def test_processing(data_chunk, position_chunk):
output_array = np.empty(data_chunk.shape[:-2], dtype='object')
for index in np.ndindex(data_chunk.shape[:-2]):
islice = np.s_[index]
intensity_list = []
data = data_chunk[islice]
positions = position_chunk[islice]
for x, y in positions:
intensity_list.append(data[x, y])
output_array[islice] = np.array(intensity_list)
return output_array
data = da.random.random(size=(4, 4, 10, 10), chunks=(2, 2, 10, 10))
positions = np.empty(data.shape[:-2], dtype='object')
for index in np.ndindex(positions.shape):
positions[index] = np.arange(10).reshape(5, 2)
data_output = da.map_blocks(test_processing, data, positions, dtype=np.object,
chunks=(2, 2), drop_axis=(2, 3))
data_output.compute()
This gives the error ValueError: Can't drop an axis with more than 1 block. Please use
atopinstead.
, which I'm guessing is due to positions
having 3 dimensions, while data
has 4 dimensions.
The same function, but without the positions
dask array works fine.
import numpy as np
import dask.array as da
def test_processing(data_chunk):
output_array = np.empty(data_chunk.shape[:-2], dtype='object')
for index in np.ndindex(data_chunk.shape[:-2]):
islice = np.s_[index]
intensity_list = []
data = data_chunk[islice]
positions = [[5, 2], [1, 3]]
for x, y in positions:
intensity_list.append(data[x, y])
output_array[islice] = np.array(intensity_list)
return output_array
data = da.random.random(size=(4, 4, 10, 10), chunks=(2, 2, 10, 10))
data_output = da.map_blocks(test_processing, data, dtype=np.object,
chunks=(2, 2), drop_axis=(2, 3))
data_computed = data_output.compute()
Upvotes: 2
Views: 1082
Reputation: 317
This has been fixed in more recent versions of dask
: running the same code on version 2.3.0 of dask
works fine.
Upvotes: 1