titanium
titanium

Reputation: 105

Is there any numpy tricks to avoid for loops in this piece of code?

I have a following pice of code, which basically sets certain elements of a long numpy array to zero.

import numpy as np
a = np.array([[[1, 2, 3, 4], [0, 1, 2, 3], [1, 2, 3, 4]], 
              [[0, 1, 2, 3], [0, 0, 1, 2], [0, 2, 3, 4]]])

aup = a + 2

b = np.array([[[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]],
              [[3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2], [3, 4, 5]],
              [[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]], 
              [[3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2], [3, 4, 5]],
              [[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]]])

for i in range(3):
    for j in range(4):
        for row_t in range(a[0, i, j], aup[0, i, j]):
            for col_t in range(a[1, i, j], aup[1, i, j]):
                row = row_t%5
                col = col_t%5
                b[row, col, i] = 0

The main problem is the run time, which can be a lot because of the 4 for loops I have. Is there any numpy trick to avoid these for-loops and basically attain the same?

Update:

I think it all boils down to the following problem. First find the minimum of a and maximum of aup over axis 2, i.e.

min_a = np.min(a, axis=2)%5  \\ [[1 0 1], [0 0 0]]
max_aup = np.max(aup, axis=2)%5 \\ [[1 0 1], [0 4 1]]

Npw list all the element wise tuples that form between min_a and min_aup. with corresponding index element:

tuples = [[1, 1, 0], [0, 0, 1], [0, 1, 1], [0, 2, 1], [0, 3, 1], 
          [0, 4, 1], [1, 0, 2], [1, 1, 2]]

and set the elements of b at those tuples to be 0:

b[tuples] = 0

So, the entire problem is basically efficiently finding these tuples.

Upvotes: 1

Views: 1085

Answers (1)

Paul Panzer
Paul Panzer

Reputation: 53029

Ok, here is a vectorized version of your loops. It explicitly uses the fact that aup is a plus scalar offset:

>>> import numpy as np
>>> a = np.array([[[1, 2, 3, 4], [0, 1, 2, 3], [1, 2, 3, 4]], 
...               [[0, 1, 2, 3], [0, 0, 1, 2], [0, 2, 3, 4]]])
>>> 
>>> aup = a + 2
>>> 
>>> b = np.array([[[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]],
...               [[3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2], [3, 4, 5]],
...               [[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]], 
...               [[3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2], [3, 4, 5]],
...               [[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]]])
>>> 
>>> b0 = b.copy()
>>> 
>>> 
>>> for i in range(3):
...     for j in range(4):
...         for row_t in range(a[0, i, j], aup[0, i, j]):
...             for col_t in range(a[1, i, j], aup[1, i, j]):
...                 row = row_t % 5
...                 col = col_t % 5
...                 b[row, col, i] = 0
... 
>>> b_loopy = b
>>> b = b0.copy()
>>> 
>>> i, j, k, _ = np.ogrid[:2, :2, :3, :0]
>>> 
>>> b[(a[0] + i) % 5, (a[1] + j) % 5, k] = 0
>>> 
>>> b_vect = b
>>> 
>>> np.all(b_vect == b_loopy)
True

For arbitrary aup > a it's a bit more hairy. The code below is slightly slower than the loop for small problem size, but scales much better, see timings at the end of this post.

import numpy as np

a = np.array([[[1, 2, 3, 4], [0, 1, 2, 3], [1, 2, 3, 4]], 
              [[0, 1, 2, 3], [0, 0, 1, 2], [0, 2, 3, 4]]])

aup = a + np.random.randint(2, 5, a.shape)

b = np.array([[[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]],
              [[3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2], [3, 4, 5]],
              [[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]], 
              [[3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2], [3, 4, 5]],
              [[0, 1, 2], [3, 4, 5], [0, 2, 3], [5, 6, 7], [0, 1, 2]]])

def setup(i, j, k, l):
    b = np.random.randint(1, 10, (i, j, k))
    a = np.array(np.unravel_index(np.random.randint(0, i*j, (k, l)), (i, j)))
    aup = a + 1 + np.array(np.unravel_index(np.random.randint(
        0, (i-2)*(j-2), (k, l)), (i-2, j-2)))
    return b, a, aup

def f_loopy(b, a, aup):
    b = b.copy()
    for i in range(b.shape[-1]):
        for j in range(a.shape[-1]):
            for row_t in range(a[0, i, j], aup[0, i, j]):
                for col_t in range(a[1, i, j], aup[1, i, j]):
                    row = row_t % b.shape[0]
                    col = col_t % b.shape[1]
                    b[row, col, i] = 0
    return b

def unwrap_indices(a, aup, shp):
    i, j, k = shp
    _, k, m = a.shape
    ij = np.array((i, j))[:, None]
    m *= k
    a = a.reshape(2, -1)
    aup = aup.reshape(2, -1)
    fst, snd = mask = aup > ij
    fsti = np.flatnonzero(fst)
    sndi = np.flatnonzero(snd)
    bthi = fsti[snd[fsti]]
    m2 = m + len(fsti)
    m3 = m2 + len(sndi)
    d = np.empty((2, m3 + len(bthi)), dtype=int)
    d[0, m:m2] = aup[0, fsti] - i
    d[1, m2:m3] = aup[1, sndi] - j
    d[:, m3:] = aup[:, bthi] - ij
    d[:, :m] = np.where(mask, ij, aup) - a
    d[1, m:m2] = d[1, fsti]
    d[0, m2:m3] = d[0, sndi]
    aa = np.empty_like(d)
    aa[:, :m] = a
    aa[1, m:m2] = a[1, fsti]
    aa[0, m2:m3] = a[0, sndi]
    aa[0, m:m2] = 0
    aa[1, m2:m3] = 0
    aa[:, m3:] = 0
    z = np.empty(aa.shape[1:], dtype=int)
    z[:m].reshape(k, -1)[...] = np.arange(k)[:, None]
    z[m:m2] = z[fsti]
    z[m2:m3] = z[sndi]
    z[m3:] = z[bthi]
    return aa, d, z

def embed_indices_flat(aa, d, z, shp):
    i, j, k = shp
    _, m = aa.shape
    A = np.ravel_multi_index((aa[0], aa[1], z), shp)
    A[1:] -= A[:-1] + np.einsum('ij,i->j', d[:, :-1]-1, (j*k, k))
    A1 = (((j+1)-d[1]) * k).repeat(d[0]) 
    A1[0] = A[0]
    A1[d[0, :-1].cumsum()] = A[1:]
    idx = d[1].repeat(d[0]).cumsum()
    A2 = np.full(idx[-1:], k)
    A2[0] = A1[0]
    A2[idx[:-1]] = A1[1:]
    return A2.cumsum()

def f_vect(b, a, aup, switch_strat=20):
    b = b.copy()
    A, D, Z = unwrap_indices(a, aup, b.shape)
    if D.sum() > switch_strat * D.size:
        for ai, di, zi in zip(A.T, D.T, Z):
            b[ai[0]:ai[0]+di[0], ai[1]:ai[1]+di[1], zi] = 0
    else:
        b.ravel()[embed_indices_flat(A, D, Z, b.shape)] = 0
    return b

from timeit import timeit

print(np.all(f_vect(b, a, aup) == f_loopy(b, a, aup)))
print(timeit(lambda: f_vect(b, a, aup), number=1000))
print(timeit(lambda: f_loopy(b, a, aup), number=1000))

b, a, aup = setup(40, 30, 10, 8)

print(np.all(f_vect(b, a, aup) == f_loopy(b, a, aup)))
print(timeit(lambda: f_vect(b, a, aup), number=1000))
print(timeit(lambda: f_loopy(b, a, aup), number=1000))

Sample output:

True                      # <- results equal
0.08376670675352216       # <- vectorized
0.062134379986673594      # <- loopy
True                      # same for larger problem size 
0.3771689278073609        #
8.375985411927104         #

Upvotes: 6

Related Questions