J. Leung
J. Leung

Reputation: 95

Efficient element-wise matching of two arrays in python

I am working on a matching problem. I have two arrays A and B of the same size (assume 1000x1000). For each element a_ij in A (i and j are the row and column indices, respectively), I need to find the closest element b_i? in the i-th row of B. Right now, I have come up with three solutions:

  1. Nested loop (Takes 5.47s)
  2. Single loop with broadcasting (Takes 6.40s)
  3. Parallel loop (Takes 30.27s)

I think the above three methods are not time-efficient enough, or at least my implementation is not good enough to achieve that (the 3rd method takes longer than I expect!). It would be nice if you can point out how can I improve my code. Thanks in advance!

My current solutions are as follows:

import numpy as np
import time
from joblib import Parallel, delayed

def method1(A,B): # Nested loop (as a Benchmark)
    output = np.zeros(A.shape)
    for r in range(A.shape[0]):
        rowB = B[r]
        for c in range(A.shape[1]):
            elementA = A[r,c]
            diff_vector = np.abs(elementA - rowB)
            output[r,c] = np.argmin(diff_vector)
    return output

def method2(A,B): # Broadcasting
    output = np.zeros(A.shape)
    for r in range(A.shape[0]):
        diff_matrix = np.abs(A[r][:, np.newaxis] - B[r])
        output[r] = np.argmin(diff_matrix, axis=1)
    return output

def method3(A,B): # Parallel for loop
    def matcher(r, A, B):
        i = r//A.shape[1]
        j = r % A.shape[1]

        elementA = A[i, j]
        rowB = B[i]
        diff_vector = np.abs(elementA - rowB)
        return np.argmin(diff_vector)

    output = Parallel(n_jobs=4)(delayed(matcher)(r, A, B) for r in range(A.shape[0]*A.shape[1]))
    output = np.reshape(output, [A.shape[0], A.shape[1]])
    return output

A = np.random.randn(1000,1000)
B = np.random.randn(1000,1000)
output1 = method1(A,B)
output2 = method2(A,B)
output3 = method3(A,B)

Upvotes: 1

Views: 505

Answers (1)

Jérôme Richard
Jérôme Richard

Reputation: 50288

The algorithm is clearly inefficient (all methods): checking all items of rowB to find the one that is the closest is expensive and results in several billions of floating-point operations. Not to mention Numpy creates an expensive unnecessary temporary array for each call to elementA - rowB and np.abs.

You can speed up this by doing a sort followed by binary searches. The complexity in time of this approach is O(n**2 log n), while for the initial approach it is O(n**3). You can also write a parallel implementation of this algorithm easily using Numba. Moreover, you do not need to store the output in a float64-typed array: an integer-based array is enough (faster and possibly smaller).

Here is the resulting implementation:

import numba as nb
import numpy as np

@nb.njit('int_[:,::1](float64[:,::1],float64[:,::1])', parallel=True)
def method4(A,B):
    mB = B.shape[1]
    output = np.empty(A.shape, dtype=np.int_)

    # Parallel loop
    for r in nb.prange(A.shape[0]):
        rowA = A[r]
        rowB = B[r]

        # Sort a row
        index_rowB = np.argsort(rowB)
        sorted_rowB = rowB[index_rowB]

        # Fast binary search in the sorted row
        # See: https://stackoverflow.com/a/46184652/12939557
        idxs = np.searchsorted(sorted_rowB, rowA)
        left = np.fabs(rowA - sorted_rowB[np.maximum(idxs-1, 0)])
        right = np.fabs(rowA - sorted_rowB[np.minimum(idxs, mB-1)])
        prev_idx_is_less = (idxs == mB) | (left < right)

        # Find the position in the original unsorted array
        output[r] = index_rowB[idxs - prev_idx_is_less]

    return output

output4 = method4(A,B)

This is 482 times faster than the fastest initial implementation (method1) on my 10-core machine:

method1:  4341 ms
method2:  5839 ms
method4:     9 ms

Upvotes: 1

Related Questions