Reputation: 113
What is the optimal (fastest) way to compute the following expression:
\sum_{i \in I} \alpha_i \sum_{j \in J} \beta_j M[:, i, j] for given numpy arrays:
I, J containing indices;
alpha and beta of shapes (1, |I|) and (1, |J|) containing coefficients;
M of ndim=3.
In a more general case, I need to compute it for a lot of alpha, beta, I, J inputs with the same M array. So it may be considered that alphas nad Is have shape (N, 4), betas and Js have shape (N, 3) and I need to compute this expression for each n in range(N).
Thank you in advance.
Based on some comments, to make the question more clear and to add some context, here is a naive approach to the problem with realistic sizes:
M
has shape (500, 200000, 20)
I
has shape (10^6, 4)
J
has shape (10^6, 3)
alpha
has shape (10^6, 4)
beta
has shape (10^6, 3)
N = 10**6
M_new = np.zeros(M.shape[0], N)
for n in range(N):
for i in range(4):
for j in range(3):
M_new[:, n] += alpha[n, i] * beta[n, j] * M[:, I[n, i], J[n, j]]
So the question is how to compute M_new as fast as possible.
Solutions
So far the fastest solution is the one proposed by @jdehesa using Numba.
@Han-KwangNienhuys presented speed comparison with alternative methods.
Upvotes: 1
Views: 100
Reputation: 3244
With numpy arrays, you can index like this: a[[1, 7, 5]]
, which is roughly equivalent to [a[1], a[7], a[5]]
. You can use this to select M[:, I[n], :]
or M[:, :, J[n]]
. However, M[:, I[n], J[n]]
will not work; instead you need to do M[:, I[n], :][:, :, J[n]]
. Fortunately, the last axis of M
is small: M[:, I[n], :]
has shape (500, 4, 20)
; reducing that to (500, 4, 3)
is not too much waste of copying. The other way around, M[:, :, J[n]][:, I[n], :]
would produce the same result, but would have an intermediate result with shape (500, 200000, 3)
, which would incur vastly more overhead.
Here is how to do it:
np.random.seed(1)
# generate test arrays
N, p, q, r = 11, 5, 9, 7 # realistic values 1e6, 500, 2e5, 20
M = np.random.randint(0, 20, size=(p, q, r))
mi, mj = 4, 3
I = np.random.randint(0, q-1, size=(N, mi))
J = np.random.randint(0, r-1, size=(N, mj))
alpha = np.random.randint(-99, 99, size=(N, mi))
beta = np.random.randint(-99, 99, size=(N, mj))
# Reference implementation
M_new = np.zeros((M.shape[0], N))
for n in range(N):
for i in range(mi):
for j in range(mj):
M_new[:, n] += alpha[n, i] * beta[n, j] * M[:, I[n, i], J[n, j]]
# New implementation
M_new2 = np.zeros((p, N))
for n in range(N):
M_sub = M[:, I[n]][:, :, J[n]] # shape (p, mi, mj)
M_new2[:, n] = np.einsum('i,j,kij', alpha[n], beta[n], M_sub)
assert np.all(M_new == M_new2)
As an alternative to M[:, I[n], :][:, :, J[n]]
, it's possible to construct two index arrays ii
and jj
, both with shape (4, 3)
so that you can take M[:, ii, jj]
:
M_new3 = np.zeros_like(M_new2)
for n in range(N):
# ii, jj: shape (mi, mj)
ii, jj = np.meshgrid(I[n], J[n], indexing='ij')
M_sub = M[:, ii, jj] # shape (p, mi, mj)
M_new3[:, n] = np.einsum('i,j,kij', alpha[n], beta[n], M_sub)
assert np.all(M_new == M_new3)
However, this is tricky to do right: it's easy to forget the indexing='ij'
parameter for np.meshgrid
. For small datasets (N, p, q, r = 10000, 50, 2000, 20
) it seems to be slower than the first implementation (M_new2
).
EDIT: one more, without any for loops:
assert mi*mj*p*N < 1e8 # prevent out-of-memory error
ii = np.empty((N, mi, mj, p), dtype=np.int32)
ii[:] = I.reshape(N, mi, 1, 1)
jj = np.empty((N, mi, mj, p), dtype=np.int32)
jj[:] = J.reshape(N, 1, mj, 1)
kk = np.empty((N, mi, mj, p), dtype=np.int32)
kk[:] = np.arange(p).reshape(1, 1, 1, p)
M_sub = M[kk, ii, jj] # shape (N, mi, mj, p)
M_new4 = np.einsum('ni,nj,nijk->kn', alpha, beta, M_sub)
assert np.all(M_new == M_new4)
For the original array sizes, the ii, jj, kk, M_sub
arrays will each have 6e+9 elements, so you'll need a lot of memory (compared to M
which has 2e+9 elements). Probably it's useful to split up the job into chunks (along the N
axis) that each fit in the L3 cache memory of the CPU and also to set up M_sub
to have shape (N, p, mi, mj)
in memory (not by reshaping) and M_new
as (N, p)
.
For N, p, q, r = 10000, 50, 2000, 20
and integer multiplications:
M[:, I[n]][:, :, J[n]]
: 171 msmeshgrid(I[n], J[n], indexing='ij')
: 360 msM[kk, ii, jj]
: 129 msnumba.njit
(from the other answer): 20 msTimings include the assert
statements.
Upvotes: 2
Reputation: 59681
EDIT:
I misunderstood the question in the first place. I left the original answer below but it does not do what the question asks.
At the risk of sounding obvious, you can always resort to Numba:
import numpy as np
import numba as nb
# Original loop implementation
def comb_loop(m, ii, jj, alpha, beta):
n = ii.shape[0]
m_new = np.zeros((m.shape[0], n))
for col in range(n):
for i in range(4):
for j in range(3):
m_new[:, col] += alpha[col, i] * beta[col, j] * m[:, ii[col, i], jj[col, j]]
return m_new
# Numba implementation
@nb.njit(parallel=True)
def comb_nb(m, ii, jj, alpha, beta):
n = ii.shape[0]
m_new = np.empty((m.shape[0], n), m.dtype)
for col in nb.prange(n):
for row in range(m.shape[0]):
val = 0
for i in range(4):
for j in range(3):
val += alpha[col, i] * beta[col, j] * m[row, ii[col, i], jj[col, j]]
m_new[row, col] = val
return m_new
# Test
np.random.seed(0)
N = 1_000 # Reduced for testing
m = np.random.rand(500, 200_000, 20)
ii = np.random.randint(m.shape[1], size=(N, 4))
jj = np.random.randint(m.shape[2], size=(N, 3))
alpha = np.random.rand(N, 4)
beta = np.random.rand(N, 3)
# Check results match
print(np.allclose(comb_loop(m, ii, jj, alpha, beta), comb_nb(m, ii, jj, alpha, beta)))
# True
# Timings
%timeit comb_loop(m, ii, jj, alpha, beta)
# 181 ms ± 1.77 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit comb_nb(m, ii, jj, alpha, beta)
# 31.1 ms ± 2.03 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
ORIGINAL WRONG ANSWER
You can use np.einsum
:
import numpy as np
def comb(alpha, beta, m):
return np.einsum('i,j,nij->n', alpha, beta, m)
# Test
np.random.seed(0)
alpha = np.random.rand(10)
beta = np.random.rand(20)
m = np.random.rand(30, 10, 20)
result = comb(alpha, beta, m)
print(result.shape)
# (30,)
Upvotes: 3