russell
russell

Reputation: 81

Function to sum over repeated indices

We are looking for ways to improve this small step in a large pipeline we are developing. The problem is:

Given: multiple np.ndarray objects of integer datatype that effectively describe pixel indices and a single np.ndarray of floating type that describes a weight. All of these arrays will have the same shape/size.

Problem: any time a set of indices is repeated, sum the weights for the repeated tuple (which is now unique by construction).

Return: the unique indices and the summed weights.

We've developed a fairly robust method, but it's quite slow. Consider:

x = np.array([1, 1, 2, 2, 3, 3, 1, 1, 3, 3, 4], dtype=np.uint16)
y = np.array([1, 1, 2, 2, 2, 2, 1, 1, 3, 4, 5], dtype=np.uint16)
l = np.array([1, 2, 2, 2, 3, 2, 1, 1, 3, 3, 6], dtype=np.uint16)
v = np.array([2, 4, 6, 8, 7, 5, 3, 1, 8, 6, 4], dtype=np.float64)

indices = (x, y, l)
dims = [np.amax(index) + 1 for index in indices]
idx = np.ravel_multi_index(indices, dims, order='F')

out, uind, cinv = np.unique(idx, return_index=True, return_inverse=True)
vv = np.bincount(cinv, weights=v)

out = tuple(index[uind] for index in indices)
ret = (vv, *out)

This works and returns the expected result:

print(vv, out)
array([ 6.,  4., 14.,  5.,  7.,  8.,  6.,  4.])
array([1, 1, 2, 3, 3, 3, 3, 4], dtype=uint16)
array([1, 1, 2, 2, 2, 3, 4, 5], dtype=uint16)
array([1, 2, 2, 2, 3, 3, 3, 6], dtype=uint16)

This is a MWE with small arrays, but in practice, these arrays will be well in excess of a million elements. And this raises the problem that the call to np.unique with these settings is very slow. We've tried a few things: using CSR_matrix or numba, and for different reasons they weren't much better.

What would be a more efficient way of performing these types of calculations?

Upvotes: 8

Views: 202

Answers (5)

simon
simon

Reputation: 5133

I tried using PyTorch's sparse_coo_tensor and gained a bit of speedup, especially on the GPU, for larger array sizes:

speed test results

Here is the test code:

from timeit import Timer
import matplotlib.pyplot as plt
import numpy as np
import torch

rand = np.random.default_rng(42)
num_timings = 10
num_tests = 9

def given(xyl, vals):
    dims = [np.amax(index) + 1 for index in xyl]
    idx = np.ravel_multi_index(xyl, dims, order='F')
    out, uind, cinv = np.unique(idx, return_index=True, return_inverse=True)
    return np.bincount(cinv, weights=vals), *(index[uind] for index in xyl)

def torch_gpu(xyl_gpu, vals_gpu):
    t = torch.sparse_coo_tensor(xyl_gpu, vals_gpu).coalesce()
    return t.values(), *t.indices()

def torch_cpu(xyl_cpu, vals_cpu):
    t = torch.sparse_coo_tensor(xyl_cpu, vals_cpu).coalesce()
    return t.values(), *t.indices()

all_num_vals = np.logspace(0, num_tests - 1, num=num_tests, dtype=int)
all_fcts = (given, torch_gpu, torch_cpu)
timings_by_fct = {fct.__name__: np.zeros(num_tests) for fct in all_fcts}

for i, num_vals in enumerate(all_num_vals):

    x, y, l = rand.integers(1000, size=(3, num_vals), dtype=np.uint16)
    v = rand.uniform(size=num_vals).astype(np.float64)
    v_gpu = torch.from_numpy(v).to("cuda:0")
    v_cpu = torch.from_numpy(v)
    args_by_fct = {"given": ((x, y, l), v),
                   "torch_gpu": (torch.from_numpy(np.c_[x, y, l].T).to("cuda:0"), v_gpu),
                   "torch_cpu": (torch.from_numpy(np.c_[x, y, l].T), v_cpu)}
    torch.cuda.synchronize()
    
    for fct in all_fcts:
        name = fct.__name__
        timings_by_fct[name][i] = Timer(lambda: fct(*args_by_fct[name])).timeit(num_timings)

plt.xscale("log")       
for name, vals in timings_by_fct.items():
    plt.plot(all_num_vals, timings_by_fct["given"] / vals, label=name)
plt.xlabel("number of values")
plt.ylabel("speedup factor")
plt.legend()

# Test correctness
x = np.array([1, 1, 2, 2, 3, 3, 1, 1, 3, 3, 4], dtype=np.uint16)
y = np.array([1, 1, 2, 2, 2, 2, 1, 1, 3, 4, 5], dtype=np.uint16)
l = np.array([1, 2, 2, 2, 3, 2, 1, 1, 3, 3, 6], dtype=np.uint16)
v = np.array([2, 4, 6, 8, 7, 5, 3, 1, 8, 6, 4], dtype=np.float64)
v_gpu = torch.from_numpy(v).to("cuda:0")
v_cpu = torch.from_numpy(v)
r_given = given((x, y, l), v)
for r_proposed in (torch_gpu(torch.from_numpy(np.c_[x, y, l].T).to("cuda:0"), v_gpu),
                   torch_cpu(torch.from_numpy(np.c_[x, y, l].T), v_cpu)):
    for vals_given, vals_proposed in zip(r_given, r_proposed):
        assert np.allclose(vals_given, vals_proposed.cpu().numpy())

Note that I cheated a bit, in that I did not include the tensors moving from/to the GPU in the timings. Doing so reduces the maximum speedup factor to about 2 on my machine.

Update: I included Mad Physicist's approach in the timings:

revised results with added approach

Note that I switched to a log scale on the y axis.

Upvotes: 1

ThomasIsCoding
ThomasIsCoding

Reputation: 101568

probably you could try pandas.groupby

import pandas as pd

df = pd.DataFrame({'x':x, 'y': y, 'l': l, 'v': v})
res = df.groupby(["x", "y", "l"], as_index=False).sum()
vv, out = res['v'].to_numpy(), res[res.columns.difference(['v'])].to_numpy()

and you will obtain

vv = 
 [ 6.  4. 14.  5.  7.  8.  6.  4.]

out = 
 [[1 1 1]
 [2 1 1]
 [2 2 2]
 [2 3 2]
 [3 3 2]
 [3 3 3]
 [3 3 4]
 [6 4 5]]

Upvotes: 0

Jérôme Richard
Jérôme Richard

Reputation: 50478

Sorting is expensive, especially argsort internally called in np.unique. We can use a dictionary to remove replicates and perform the accumulation. This is fast if there are many replicates. However, this can be as slow as a sort (if not a bit worst) when all the values are almost different. Indeed, in this case, the target dictionary takes a significant space and accesses are not cache friendly at all (resulting in much slower fetches). In this case, Bloom filters can be used to speed up the computation (by quickly check if the items was never seen so far while iterating on all values). Unfortunately, bloom filters are not trivial to implement and I am not aware of any very fast Python library supporting them, especially in a Numba code, so I am not gonna use them in this question. You can see an example in this post. An alternative solution is to parallelize the Numpy sort (which use a very very fast sequential implementation on x86-64 CPU since Numpy 2.0) so to speed up the np.unique expensive function. This answer also provide a simple C++ version using parallel implementation of std::sort (though the one in the previous link should be even faster).

The key point to remember is that the best algorithm is dependent of the following ratio:
R = np.unique(idx).size / idx.size
.

  • R is close to 0: pick a dict-based algorithm;
  • R is close to 1: pick a sort-based algorithm.

Dict-based sequential Numba implementation

Here is a dict-based algorithm implementation in Numba:

import numba as nb
import numpy as np

@nb.njit('(uint16[:], uint16[:], uint16[:], float64[:])')
def compute(x, y, l, v):
    n = x.size
    assert (y.size, l.size, v.size) == (n, n, n)

    # Find the max of each array
    dx, dy, dl = np.uint32(0), np.uint32(0), np.uint32(0)
    for i in range(n):
        dx = max(dx, np.uint32(x[i]))
        dy = max(dy, np.uint32(y[i]))
        dl = max(dl, np.uint32(l[i]))
    dx = dx + 1
    dy = dy + 1
    dl = dl + 1
    assert np.uint64(dx) * np.uint64(dy) * np.uint64(dl) <= np.uint64(0xFFFFFFFF)

    # Use a dict (hash-map) to accumulate the weights of unique values
    weights = {}
    for i in range(n):
        xi = np.uint32(x[i])
        yi = np.uint32(y[i])
        li = np.uint32(l[i])
        idx = ((xi * dy) + yi) * dl + li
        if idx in weights:
            weights[idx] += v[i]
        else:
            weights[idx] = v[i]

    # Iterate on the unique values and store the resulting arrays
    m = len(weights)
    out_x = np.empty(m, dtype=np.uint16)
    out_y = np.empty(m, dtype=np.uint16)
    out_l = np.empty(m, dtype=np.uint16)
    out_w = np.empty(m, dtype=np.float64)
    i = 0
    for idx, w in weights.items():
        out_x[i] = (idx // dl) // dy
        out_y[i] = (idx // dl) % dy
        out_l[i] = idx % dl
        out_w[i] = w
        i += 1
        
    return out_x, out_y, out_l, out_w

Please note that most of the time is spend in dictionary accesses on my machine, and unfortunately, Numba dictionary are relatively slow so we should not expect a big speed up.


Sort-based parallel C++ implementation

A simple alternative way to make np.unique faster is simply to use a parallel sort. An argsort is not required here and it is actually quite overkill: you can just sort the pairs of index-values. Implementing a fast parallel sort manually is pretty challenging though. A simple way to reuse an existing implementation is to write a native C++ code calling std::sort with an std::execution::par_unseq policy. Then, you can easily track the replicates and do the accumulation. Here is the resulting code:

#include <cstdlib>
#include <cstdio>
#include <cassert>
#include <cstdint>
#include <algorithm>
#include <execution>

#ifdef _WIN32
    #define DLL_EXPORT __declspec(dllexport)
#else
    #define DLL_EXPORT
#endif

extern "C"
{
    DLL_EXPORT int32_t compute(uint16_t* x, uint16_t* y, uint16_t* l, double* v, uint16_t* out_x, uint16_t* out_y, uint16_t* out_l, double* out_w, int32_t n)
    {
        using SortedItem = std::pair<uint64_t, double>;
        std::vector<SortedItem> items(n);

        if(n == 0)
            return 0;

        for (int32_t i = 0; i < n; ++i)
            items[i] = std::make_pair((uint64_t(x[i]) << 32) | (uint64_t(y[i]) << 16) | uint64_t(l[i]), v[i]);

        std::sort(std::execution::par_unseq, items.begin(), items.end());
        int32_t m = 1;

        for (int32_t i = 1; i < n; ++i)
            m += items[i-1].first != items[i].first;

        int32_t pos = 0;
        double sum = 0.0;

        for (int32_t i = 0; i < n; ++i)
        {
            const auto curr_idx = items[i].first;
            const auto next_idx = items[i+1].first;
            sum += items[i].second;

            if(i == n-1 || curr_idx != next_idx)
            {
                out_x[pos] = uint16_t(curr_idx >> 32);
                out_y[pos] = uint16_t(curr_idx >> 16);
                out_l[pos] = uint16_t(curr_idx);
                out_w[pos] = sum;
                pos++;
                sum = 0.0;
            }
        }

        assert(pos == m);
        return m;
    }
}

You can compile this with the following Makefile:

all:
    mkdir -p build
    clang++ --std=c++17 -O3 -fopenmp -c compute.cpp -o build/compute.obj
    clang++ -shared build/compute.obj -o build/compute.dll -fPIC -fopenmp

This means you need Clang (Clang-CL on Windows) and a basic GNU-like environment (e.g. MinGW or MSys on Windows) to compile the library. You could certainly use a MSVC environment on Windows, but AFAIK the parallel STL requires the Intel TBB library. On Linux, you need to replace the "dll" extension by "so" and prefix the name with "lib": "libcompute.so". The generated library can be distributed to users.

You can then load it directly at runtime. Here is the final Python code:

import ctypes

# Load the DLL and define the prototype of the native C++ function
dll = ctypes.CDLL('./build/compute.dll')
int32_t = ctypes.c_int32
array_u32_t = np.ctypeslib.ndpointer(dtype=np.uint16, ndim=1, flags='C_CONTIGUOUS')
array_f64_t = np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags='C_CONTIGUOUS')
dll.compute.argtypes = [array_u32_t, array_u32_t, array_u32_t, array_f64_t, array_u32_t, array_u32_t, array_u32_t, array_f64_t, int32_t]
dll.compute.restype = int32_t

# Wrapping function
def compute_native(x, y, l, v):
    n = x.size
    assert (y.size, l.size, v.size) == (n, n, n)
    out_x = np.empty(n, dtype=np.uint16)
    out_y = np.empty(n, dtype=np.uint16)
    out_l = np.empty(n, dtype=np.uint16)
    out_w = np.empty(n, dtype=np.float64)
    m = dll.compute(x, y, l, v, out_x, out_y, out_l, out_w, n)
    out_x = out_x[:m].copy()
    out_y = out_y[:m].copy()
    out_l = out_l[:m].copy()
    out_w = out_w[:m].copy()
    return out_x, out_y, out_l, out_w

Benchmark

Here are performance results on my i5-9600KF CPU (6 cores), with arrays of size n=1_000_000, uniformly-distributed random numbers and 3 different R values (variable range for np.random.randint):

With R ~= 1.00 (poor use-case for the dict-based implementation)
    - seq MadPhysicist's code:    8437 ms      (takes 8 GiB of RAM!)
    - seq initial code:            212 ms
    - seq Numba dict-based code:   132 ms      <----- 1.6 times faster
    - seq C++ dict-based code:     121 ms
    - par C++ sort-based code:      32 ms      <----- 6.6 times faster

With R ~= 0.26 (better use-case for dicts)
    - seq initial code:            201 ms
    - seq Numba dict-based code:    72 ms      <----- 2.8 times faster
    - seq C++ dict-based code:      45 ms
    - par C++ sort-based code:      31 ms      <----- 6.5 times faster
    - seq MadPhysicist's code:      29 ms

With R ~= 0.03 (perfect use-case for dicts)
    - seq initial code:            192 ms
    - seq Numba dict-based code:    50 ms      <----- 3.8 times faster
    - par C++ sort-based code:      29 ms      <----- 6.6 times faster
    - seq MadPhysicist's code:      14 ms
    - seq C++ dict-based code:      12 ms

Thus, the proposed sequential dict-based implementation is 1.6~3.8 times faster than the initial code on on large arrays. On smaller arrays, the same effect if visible though the speed-up are smaller.

Note that "seq Numba dict-based code" is an equivalent of the provided Numba code rewritten in C++, and using a fast hash-map implementation (Tessil's robin-hood hash-maps).

The parallel C++ sort-based implementation is 6.6 times faster than the initial code on large arrays. It is the fastest one overall. This is mainly because it runs on multiple cores as opposed to other implementations.


Additional notes

Please note that when R is pretty small like <0.1, using multiple threads with thread-local dictionary (then merged) should result in a significantly faster implementation. When R is big, this does not worth it with Numba because it should not scale (due to allocations) and it is pretty complicated to implement. If you want to parallelize the dict-based implementation, then I strongly advise you to use a native language like C++. Doing this efficiently is far from being easy.

Upvotes: 2

Mad Physicist
Mad Physicist

Reputation: 114320

You can generate the results you want in two linear passes without needing to sort anything at all using np.add.at. It would be helpful if your indices were allocated up-front as a single buffer instead of three separate ones:

indices = np.stack((x, y, l), axis=0)

Either way, you can compute vv directly, without resorting to sorting:

dims = indices.max(axis=1) + 1
idx = np.ravel_multi_index(indices, dims, order="F")

vv = np.zeros(np.prod(dims))
np.add.at(vv, idx, v)

So far so good. But now you want copies of your unique indices. To avoid sorting, you can trivially unravel the index instead of doing the lookup:

nz = np.nonzero(vv)
out = np.unravel_index(nz, dims, order="F")

And sure enough, you get an output of

>>> vv
array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., 14.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  3.,  0.,  0.,  0.,  0.,  0.,  5.,  8.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  6.,
        0.,  0.,  0.,  0.,  8.,  0.,  0.,  0.,  0.,  6.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  4.])
>>> out
(array([1, 1, 2, 3, 3, 3, 3, 4]),
 array([1, 1, 2, 2, 2, 3, 4, 5]),
 array([1, 2, 2, 2, 3, 3, 3, 6]))

Aside from the fact that this scales linearly with size regardless of anything, you are no longer tied to order="F": any ravel and unravel operation will do, as long as they are invertible.

Upvotes: 0

Paul Brodersen
Paul Brodersen

Reputation: 13031

To save other people some time: my attempt with sparse.COO was 10% slower for arrays of size 10 M.

import numpy as np
import sparse # pip install sparse

from line_profiler import profile


def original(indices, values, shape):
    idx = np.ravel_multi_index(indices, shape, order='F')
    out, uind, cinv = np.unique(idx, return_index=True, return_inverse=True)
    vv = np.bincount(cinv, weights=values)
    out = tuple(index[uind] for index in indices)
    return vv, *out


def with_sparse(indices, values, shape):
    coo = sparse.COO(indices, values, shape=shape)
    return coo.data, *coo.coords


def test_correctness():
    x = np.array([1, 1, 2, 2, 3, 3, 1, 1, 3, 3, 4], dtype=np.uint16)
    y = np.array([1, 1, 2, 2, 2, 2, 1, 1, 3, 4, 5], dtype=np.uint16)
    l = np.array([1, 2, 2, 2, 3, 2, 1, 1, 3, 3, 6], dtype=np.uint16)
    v = np.array([2, 4, 6, 8, 7, 5, 3, 1, 8, 6, 4], dtype=np.float64)
    indices = (x, y, l)

    shape = [np.amax(index) + 1 for index in indices]

    r1 = original(indices, v, shape)
    r2 = with_sparse(indices, v, shape)
    for a, b in zip(r1, r2):
        assert np.allclose(a, b)


@profile
def test_speed(n=10_000_000):
    indices = np.random.randint(0, 1000, size=(3, n))
    values = np.random.rand(n)
    shape = [np.amax(index) + 1 for index in indices]
    r1 = original(indices, values, shape)
    r2 = with_sparse(indices, values, shape)


if __name__ == "__main__":

    test_correctness()
    test_speed()

Profiler results:

Total time: 6.04991 s
File: sum_of_voxels.py
Function: test_speed at line 35

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    35                                           @profile
    36                                           def test_speed(n=10_000_000):
    37         1     169152.2 169152.2      2.8      indices = np.random.randint(0, 1000, size=(3, n))
    38         1      89515.9  89515.9      1.5      values = np.random.rand(n)
    39         1      17613.5  17613.5      0.3      shape = [np.amax(index) + 1 for index in indices]
    40         1    2727965.6    3e+06     45.1      r1 = original(indices, values, shape)
    41         1    3045661.3    3e+06     50.3      r2 = with_sparse(indices, values, shape)

Upvotes: 1

Related Questions