Reputation: 630
I have 3 arrays (x_array, y_array, p_array), the first two correspond to 2d arrays with coordinates points of random points, the third is an flatten array of points corresponding to lines.
I need to calculate the minimum orthogonal distance for each x_array, y_array point to the lines form by p_array points.
For this I have written two functions that uses mostly numpy
. The first function creates a boolean mask to get which points have an orthogonal projection to the lines, this produces a 2d array with True and False values. The second function calculates the orthogonal distance to the line or the orthogonal projection of intersection with the points.
This process works but it is part of a GUI where this processes is time sensitive.
I would like to speed up this function but my knowledge of numpy
am programing in general are not enough to improve this. I have already written this using numba
but it gets an small reduction in from 1.68 s to 1.09 for the first function.
Goal
My goal is to reduce the time of both function under 1s, I do not know is this is possible. I would appreciate any help
Results
mask: 1.6823785305023193
mask numba: 1.0962464809417725
orto distance: 1.630366563796997
Code
data:
https://drive.google.com/file/d/1GHDSH1ycQSiBJk8RvkTNiE_RoetwCZw6/view?usp=sharing https://drive.google.com/file/d/1k8wCop1fePEh7ANfkScQ_BHLGRVo-aJe/view?usp=sharing https://drive.google.com/file/d/1swhg1BXBh18xyuueOKUNNx3SVGW8Jfq9/view?usp=sharing
import numpy as np
import time
def ortoSegmentPoint_all(p_array, x_array, y_array):
"""
:param p_array: np.array([[ 718898.941 9677612.901 ], [ 718888.8227 9677718.305 ], [ 719033.0528 9677770.692 ]])
:param y_array: np.array([9677656.39934991 9677720.27550726 9677754.79])
:param x_array: np.array([718895.88881594 718938.61392781 718961.46])
:return: [POINT, LINE] indexes where point is orthogonal to line segment
"""
# PENDIENTE "m" de la recta, y = mx + n
m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
# PENDIENTE INVERTIDA, 1/m
inv_m_array = np.divide(1, m_array)
# VALOR "n", y = mx + n
n_array = y_array[:, 0] - x_array[:, 0] * m_array
# VALOR "n_orto" PARA LA RECTA PERPENDICULAR
_len_p_array = len(p_array)
n_orto_array = np.reshape(p_array[:, 1], (_len_p_array, 1)) + inv_m_array * np.reshape(p_array[:, 0], (_len_p_array, 1))
# PUNTOS DONDE SE INTERSECTAN DE FORMA PERPENDICULAR
x_intersec_array = np.divide(n_orto_array - n_array, m_array + inv_m_array)
y_intersec_array = m_array * x_intersec_array + n_array
# LISTAR COORDENADAS EN PARES
# FILAS: NUMERO DE PUNTOS, COLUMNAS: NUMERO DE TRAMOS
maskX = np.where(np.logical_and(x_intersec_array < np.max(x_array, axis=1), x_intersec_array > np.min(x_array, axis=1)), True, False)
maskY = np.where(np.logical_and(y_intersec_array < np.max(y_array, axis=1), y_intersec_array > np.min(y_array, axis=1)), True, False)
mask = maskY * maskX
return mask
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
' a=(yA−yB), b=(xB−xA) and c=xAyB−xByA '
a, b = np.array([x_array[:, 0], y_array[:, 0]]), np.array([x_array[:, 1], y_array[:, 1]])
a0, b0, c0 = a[1, :] - b[1, :], b[0, :] - a[0, :], a[0, :] * b[1, :] - b[0, :] * a[1, :]
index_point = np.unique(index_point)
_len = len(p_array[index_point])
_x = p_array[index_point][:, 0].reshape(_len, 1)
_y = p_array[index_point][:, 1].reshape(_len, 1)
_a0, _b0, _c0 = np.full(shape=(_len, a0.size), fill_value=a0), np.full(shape=(_len, b0.size), fill_value=b0), np.full(shape=(_len, c0.size), fill_value=c0)
_d0 = np.abs(_a0 * _x + _b0 * _y + _c0)
_d1 = np.sqrt((_a0 * _a0) + (_b0 * _b0))
_d = np.divide(_d0, _d1) * mask[index_point, :]
_d = np.where(_d == 0, 1000000.0, _d)
return _d, index_point
p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')
#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)
# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)
Upvotes: 1
Views: 286
Reputation: 50806
Numba can hardly speed up Numpy functions since they are already mostly optimized. However, the performance the Numpy codes can be improved by avoiding the creation/filling of many huge temporary array. Indeed, the RAM throughput is a precious scarce resource compared the processing power of modern processors (and this is getting worse since the 4 last decades so nobody expect this to change any time soon -- this is called the Memory Wall). The solution to solve this problem is simply to perform many Numpy operations in at once in loop nests. In your case, most operation can be performed in the L1 cache or even in registers. The processor can operate on them a way that is several order of magnitude. Moreover, the loops can be easily parallelized. I also find the nested loops easier to read and optimize.
Here is the resulting code (note that few calls has been modified since Numba does not fully supports Numpy yet):
import numpy as np
import numba as nb
import time
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def ortoSegmentPoint_all(p_array, x_array, y_array):
n = p_array.shape[0]
m = x_array.shape[0]
m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
inv_m_array = np.divide(1, m_array)
n_array = y_array[:, 0] - x_array[:, 0] * m_array
mask = np.empty((n, m), dtype=np.bool_)
x_min, x_max = np.minimum(x_array[:,0], x_array[:,1]), np.maximum(x_array[:,0], x_array[:,1])
y_min, y_max = np.minimum(y_array[:,0], y_array[:,1]), np.maximum(y_array[:,0], y_array[:,1])
for i in nb.prange(n):
for j in range(m):
n_orto = p_array[i, 1] + inv_m_array[j] * p_array[i, 0]
x_intersec = (n_orto - n_array[j]) / (m_array[j] + inv_m_array[j])
y_intersec = m_array[j] * x_intersec + n_array[j]
maskX = x_min[j] < x_intersec < x_max[j]
maskY = y_min[j] < y_intersec < y_max[j]
mask[i, j] = maskX & maskY
return mask
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1], int64[::1], bool_[:,::1])', parallel=True)
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
a = np.vstack((x_array[:, 0], y_array[:, 0]))
b = np.vstack((x_array[:, 1], y_array[:, 1]))
a0 = a[1, :] - b[1, :]
b0 = b[0, :] - a[0, :]
c0 = a[0, :] * b[1, :] - b[0, :] * a[1, :]
index_point = np.unique(index_point)
n = index_point.size
m = x_array.shape[0]
_d = np.empty((n, m), np.float64)
for i in nb.prange(n):
for j in range(m):
point_id = index_point[i]
_x, _y = p_array[point_id]
_d0 = np.abs(a0[j] * _x + b0[j] * _y + c0[j])
_d1 = np.sqrt(a0[j]**2 + b0[j]**2)
tmp = (_d0 / _d1) * mask[point_id, j]
if tmp == 0:
tmp = 1000000.0
_d[i, j] = tmp
return _d, index_point
p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')
#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)
# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
rows = rows.copy()
t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)
Here are the output before and after the optimisation on my 6-core machine:
Before:
mask: 1.323328
orto distance: 0.544939
After:
mask: 0.040068
orto distance: 0.025106
Note that the code can be optimized further. For example, you can play with Numba flags like fastmath
and error_model
. You can also pre-compute the inverse division of m_array + inv_m_array
. Finally, you can use some more advance micro-optimisations (like register tiling, and loop splitting so to improve vectorization) also it make your code more complex and it should be enough.
Upvotes: 2