MBV
MBV

Reputation: 630

optimize numpy python function to get orthogonal distance

I have 3 arrays (x_array, y_array, p_array), the first two correspond to 2d arrays with coordinates points of random points, the third is an flatten array of points corresponding to lines.

I need to calculate the minimum orthogonal distance for each x_array, y_array point to the lines form by p_array points.

For this I have written two functions that uses mostly numpy. The first function creates a boolean mask to get which points have an orthogonal projection to the lines, this produces a 2d array with True and False values. The second function calculates the orthogonal distance to the line or the orthogonal projection of intersection with the points.

This process works but it is part of a GUI where this processes is time sensitive. I would like to speed up this function but my knowledge of numpy am programing in general are not enough to improve this. I have already written this using numba but it gets an small reduction in from 1.68 s to 1.09 for the first function.

Goal

My goal is to reduce the time of both function under 1s, I do not know is this is possible. I would appreciate any help

Results

mask:  1.6823785305023193
mask numba:  1.0962464809417725
orto distance:  1.630366563796997

Code

data:

https://drive.google.com/file/d/1GHDSH1ycQSiBJk8RvkTNiE_RoetwCZw6/view?usp=sharing https://drive.google.com/file/d/1k8wCop1fePEh7ANfkScQ_BHLGRVo-aJe/view?usp=sharing https://drive.google.com/file/d/1swhg1BXBh18xyuueOKUNNx3SVGW8Jfq9/view?usp=sharing

import  numpy as np
import time



def ortoSegmentPoint_all(p_array, x_array, y_array):
    """
    :param p_array: np.array([[ 718898.941  9677612.901 ], [ 718888.8227 9677718.305 ], [ 719033.0528 9677770.692 ]])
    :param y_array: np.array([9677656.39934991 9677720.27550726 9677754.79])
    :param x_array: np.array([718895.88881594 718938.61392781 718961.46])
    :return: [POINT, LINE] indexes where point is orthogonal to line segment
    """
    # PENDIENTE "m" de la recta, y = mx + n
    m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
    # PENDIENTE INVERTIDA, 1/m
    inv_m_array = np.divide(1, m_array)
    # VALOR "n", y = mx + n
    n_array = y_array[:, 0] - x_array[:, 0] * m_array
    # VALOR "n_orto" PARA LA RECTA PERPENDICULAR
    _len_p_array = len(p_array)
    n_orto_array = np.reshape(p_array[:, 1], (_len_p_array, 1)) + inv_m_array * np.reshape(p_array[:, 0], (_len_p_array, 1))
    # PUNTOS DONDE SE INTERSECTAN DE FORMA PERPENDICULAR
    x_intersec_array = np.divide(n_orto_array - n_array, m_array + inv_m_array)
    y_intersec_array = m_array * x_intersec_array + n_array
    # LISTAR COORDENADAS EN PARES
    # FILAS: NUMERO DE PUNTOS, COLUMNAS: NUMERO DE TRAMOS
    maskX = np.where(np.logical_and(x_intersec_array < np.max(x_array, axis=1), x_intersec_array > np.min(x_array, axis=1)), True, False)
    maskY = np.where(np.logical_and(y_intersec_array < np.max(y_array, axis=1), y_intersec_array > np.min(y_array, axis=1)), True, False)
    mask = maskY * maskX
    return mask

def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
    ' a=(yA−yB), b=(xB−xA) and c=xAyB−xByA '
    a, b = np.array([x_array[:, 0], y_array[:, 0]]), np.array([x_array[:, 1], y_array[:, 1]])
    a0, b0, c0 = a[1, :] - b[1, :], b[0, :] - a[0, :], a[0, :] * b[1, :] - b[0, :] * a[1, :]
    index_point = np.unique(index_point)
    _len = len(p_array[index_point])
    _x = p_array[index_point][:, 0].reshape(_len, 1)
    _y = p_array[index_point][:, 1].reshape(_len, 1)
    _a0, _b0, _c0 = np.full(shape=(_len, a0.size), fill_value=a0), np.full(shape=(_len, b0.size),  fill_value=b0), np.full(shape=(_len, c0.size), fill_value=c0)
    _d0 = np.abs(_a0 * _x + _b0 * _y + _c0)
    _d1 = np.sqrt((_a0 * _a0) + (_b0 * _b0))
    _d = np.divide(_d0, _d1) * mask[index_point, :]
    _d = np.where(_d == 0, 1000000.0, _d)
    return _d, index_point


p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')


#CREATE BOOLEAN MASK

t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)

# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)

t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)

Upvotes: 1

Views: 286

Answers (1)

J&#233;r&#244;me Richard
J&#233;r&#244;me Richard

Reputation: 50806

Numba can hardly speed up Numpy functions since they are already mostly optimized. However, the performance the Numpy codes can be improved by avoiding the creation/filling of many huge temporary array. Indeed, the RAM throughput is a precious scarce resource compared the processing power of modern processors (and this is getting worse since the 4 last decades so nobody expect this to change any time soon -- this is called the Memory Wall). The solution to solve this problem is simply to perform many Numpy operations in at once in loop nests. In your case, most operation can be performed in the L1 cache or even in registers. The processor can operate on them a way that is several order of magnitude. Moreover, the loops can be easily parallelized. I also find the nested loops easier to read and optimize.

Here is the resulting code (note that few calls has been modified since Numba does not fully supports Numpy yet):

import  numpy as np
import  numba as nb
import time


@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def ortoSegmentPoint_all(p_array, x_array, y_array):
    n = p_array.shape[0]
    m = x_array.shape[0]
    m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
    inv_m_array = np.divide(1, m_array)
    n_array = y_array[:, 0] - x_array[:, 0] * m_array
    mask = np.empty((n, m), dtype=np.bool_)
    x_min, x_max = np.minimum(x_array[:,0], x_array[:,1]), np.maximum(x_array[:,0], x_array[:,1])
    y_min, y_max = np.minimum(y_array[:,0], y_array[:,1]), np.maximum(y_array[:,0], y_array[:,1])
    for i in nb.prange(n):
        for j in range(m):
            n_orto = p_array[i, 1] + inv_m_array[j] * p_array[i, 0]
            x_intersec = (n_orto - n_array[j]) / (m_array[j] + inv_m_array[j])
            y_intersec = m_array[j] * x_intersec + n_array[j]
            maskX = x_min[j] < x_intersec < x_max[j]
            maskY = y_min[j] < y_intersec < y_max[j]
            mask[i, j] = maskX & maskY
    return mask

@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1], int64[::1], bool_[:,::1])', parallel=True)
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
    a = np.vstack((x_array[:, 0], y_array[:, 0]))
    b = np.vstack((x_array[:, 1], y_array[:, 1]))
    a0 = a[1, :] - b[1, :]
    b0 = b[0, :] - a[0, :]
    c0 = a[0, :] * b[1, :] - b[0, :] * a[1, :]
    index_point = np.unique(index_point)
    n = index_point.size
    m = x_array.shape[0]
    _d = np.empty((n, m), np.float64)
    for i in nb.prange(n):
        for j in range(m):
            point_id = index_point[i]
            _x, _y = p_array[point_id]
            _d0 = np.abs(a0[j] * _x + b0[j] * _y + c0[j])
            _d1 = np.sqrt(a0[j]**2 + b0[j]**2)
            tmp = (_d0 / _d1) * mask[point_id, j]
            if tmp == 0:
                tmp = 1000000.0
            _d[i, j] = tmp
    return _d, index_point

p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')

#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)

# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
rows = rows.copy()

t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)

Here are the output before and after the optimisation on my 6-core machine:

Before:
    mask:           1.323328
    orto distance:  0.544939

After:
    mask:           0.040068
    orto distance:  0.025106

Note that the code can be optimized further. For example, you can play with Numba flags like fastmath and error_model. You can also pre-compute the inverse division of m_array + inv_m_array. Finally, you can use some more advance micro-optimisations (like register tiling, and loop splitting so to improve vectorization) also it make your code more complex and it should be enough.

Upvotes: 2

Related Questions