jeoc
jeoc

Reputation: 53

Compare value in a 2d array to nearby values

I'm looking for a way to compare each value in a 2D array to values surrounding it and returning which values are close to the value of interest (within a threshold).

The ways I've explored involve iterating through each element of a 2D array, but I feel this is not the fastest or optimal way to do it.

The input would be a 2D array (size: i x j), and the output would be two 3D arrays (k x i x j) where the "extra" dimension is there to store the i and j indices of the nearby elements that are within a threshold.

Some code to illustrate what I am doing at the moment:

import numpy as np
from tqdm import tqdm

np.random.seed(seed=10)
arr = np.random.random((100, 100))  # Some 2D input array
threshold = 0.5

# Arrays for the row and col indices
i_all, j_all = np.mgrid[0:arr.shape[0],
                        0:arr.shape[1]]  

# Footprint around the current element (ie looking at the 8 elements around the central value). Must be odd.
footprint = (3, 3)  
footprint_size = np.product(footprint)

# Prepare output for i and j indices
output_i = np.full((footprint_size, *arr.shape), np.nan)  
output_j = np.full((footprint_size, *arr.shape), np.nan)

for p, element in enumerate(tqdm(arr.flatten())):  # Iterate through each element

    i, j = np.unravel_index(p, arr.shape)

    # Create mask of elements to compare to
    mask = ((i_all >= (i - (footprint[0] - 1) / 2)) & 
            (i_all <= (i + (footprint[0] - 1) / 2)) & 
            (j_all >= (j - (footprint[1] - 1) / 2)) & 
            (j_all <= (j + (footprint[1] - 1) / 2)))

    # Create mask of those within the threshold
    close_mask = abs(arr[mask] - element) <= threshold

    if np.nansum(close_mask) < np.product(footprint):  # If at edges need to pad to be able to index into output arrays
        output_i[:, i, j] = np.pad(i_all[mask][close_mask].flatten().astype(float),
                                   (int(footprint_size - np.nansum(close_mask)), 0),
                                   mode='constant', constant_values=np.nan)
        output_j[:, i, j] = np.pad(j_all[mask][close_mask].flatten().astype(float),
                                   (int(footprint_size - np.nansum(close_mask)), 0),
                                   mode='constant', constant_values=np.nan)
    
    else:  # Don't need to pad here
        output_i[:, i, j] = i_all[mask][close_mask]
        output_j[:, i, j] = j_all[mask][close_mask]

# Output: two 3D arrays of indices corresponding to elements within the threshold of the element of interest for rows and cols

Which works fine for small arrays but is very slow when arrays have ~10^6 elements. The other idea I had was sliding the array over itself to compare values. This might be faster but I'm curious if there are any other ideas or built-in functions that can do a similar thing.

Upvotes: 0

Views: 275

Answers (2)

jeoc
jeoc

Reputation: 53

Using dankal444's answer I managed to get this working:

def slidingCompare(arr, footprint=(3, 3), threshold=0.5):
    """
          arr: 2D array | input
    footprint: tuple      | search window dimensions (must be odd)
    threshold: float      | Threshold for neighbours to be close
    """
    import numpy as np

    assert footprint[0] % 2 == 1, "Footprint dimensions should be odd. "
    assert footprint[0] % 2 == 1, "Footprint dimensions should be odd. "
    
    temp_arr = np.full((arr.shape[0] + footprint[0] - 1, 
                        arr.shape[1] + footprint[1] - 1), np.nan)
    temp_arr[footprint[0] // 2:footprint[0] // 2 + arr.shape[0],
             footprint[1] // 2:footprint[1] // 2 + arr.shape[1]] = arr
    
    # Arrays for the row and col indices
    i_all, j_all = np.mgrid[-(footprint[0] // 2):arr.shape[0]+(footprint[0] // 2), 
                            -(footprint[1] // 2):arr.shape[1]+(footprint[1] // 2)]

    # Footprint around the current element (ie looking at the 8 elements around the central value). Must be odd.
    footprint_size = np.product(footprint)

    # Prepare output for i and j indices
    output_i = np.full((footprint_size, *arr.shape), np.nan)
    output_j = np.full((footprint_size, *arr.shape), np.nan)

    output_ix = np.arange(footprint_size).reshape(footprint)
    
    for vert_pos in np.arange(footprint[0]):
        for horiz_pos in np.arange(footprint[1]):
            neighbour = temp_arr[vert_pos: vert_pos + arr.shape[0], 
                                 horiz_pos: horiz_pos + arr.shape[1]]
            close_mask = abs(arr - neighbour) <= threshold
            
            output_i[output_ix[vert_pos, horiz_pos], close_mask] = i_all[vert_pos: vert_pos + arr.shape[0], 
                                                    horiz_pos: horiz_pos + arr.shape[1]][close_mask]
            output_j[output_ix[vert_pos, horiz_pos], close_mask] = j_all[vert_pos: vert_pos + arr.shape[0], 
                                                    horiz_pos: horiz_pos + arr.shape[1]][close_mask]
            
    # Output: two 3D arrays of indices corresponding to elements within the threshold of the element of interest for rows and cols
    return output_i, output_j

Upvotes: 0

dankal444
dankal444

Reputation: 4148

I do not know where, but I am prette sure your method has some bug. When you look at results, last (100x100) subarrays have all indices present.

What I wrote gives results that look better, is ~1000x faster, but still requires some testing from you. I might have made some error.


def faster_method(arr, threshold, footprint):

    temp_arr = np.full((arr.shape[0] + footprint[0] - 1, arr.shape[1] + footprint[1] - 1), np.nan)
    temp_arr[footprint[0] // 2: footprint[0] // 2 + arr.shape[0],
             footprint[1] // 2: footprint[1] // 2 + arr.shape[1]] = arr
    temp_i_all, temp_j_all = np.mgrid[-(footprint[0] // 2): arr.shape[0] + footprint[0] // 2,
                            -(footprint[1] // 2): arr.shape[1] + footprint[1] // 2]

    footprint_size = np.product(footprint)

    output_i = np.full((footprint_size, *arr.shape), np.nan)
    output_j = np.full((footprint_size, *arr.shape), np.nan)

    output_idx = 0
    for neighbour_vertical_position in range(footprint[0]):
        for neighbour_horizontal_position in range(footprint[0]):
            if neighbour_vertical_position == footprint[0] // 2 and neighbour_horizontal_position == footprint[1] // 2:
                # center point, not a neighbour, so we can keep np.nan for it everywhere
                continue
            current_neighbour = temp_arr[neighbour_horizontal_position: neighbour_horizontal_position + arr.shape[0],
                                         neighbour_vertical_position: neighbour_vertical_position + arr.shape[1]]
            current_i_all = temp_i_all[neighbour_horizontal_position: neighbour_horizontal_position + arr.shape[0],
                                       neighbour_vertical_position: neighbour_vertical_position + arr.shape[1]]
            current_j_all = temp_j_all[neighbour_horizontal_position: neighbour_horizontal_position + arr.shape[0],
                                       neighbour_vertical_position: neighbour_vertical_position + arr.shape[1]]
            is_close_array = np.abs(arr - current_neighbour) > threshold
            output_i[output_idx] = current_i_all + 0 / is_close_array
            output_j[output_idx] = current_j_all + 0 / is_close_array

    return output_i, output_j

Upvotes: 1

Related Questions