UberJumper
UberJumper

Reputation: 21155

Why is this code so slow?

So I have this function used to calculate statistics (min/max/std/mean). Now the thing is this runs generally on a 10,000 by 15,000 matrix. The matrix is stored as a vector<vector<int> > inside the class. Now creating and populating said matrix goes very fast, but when it comes down to the statistics part it becomes so incredibly slow.

E.g. to read all the pixel values of the geotiff one pixel at a time takes around 30 seconds. (which involves a lot of complex math to properly georeference the pixel values to a corresponding point), to calculate the statistics of the entire matrix it takes around 6 minutes.

void CalculateStats()
{
    //OHGOD
    double new_mean = 0;
    double new_standard_dev = 0;

    int new_min = 256;
    int new_max = 0;

    size_t cnt = 0;
    for(size_t row = 0; row < vals.size(); row++)
    {
        for(size_t col = 0; col < vals.at(row).size(); col++)
        {
            double mean_prev = new_mean;
            T value = get(row, col);
            new_mean += (value - new_mean) / (cnt + 1);
            new_standard_dev += (value - new_mean) * (value - mean_prev);

            // find new max/min's
            new_min = value < new_min ? value : new_min;
            new_max = value > new_max ? value : new_max;
            cnt++;
        }
    }

    stats_standard_dev = sqrt(new_standard_dev / (vals.size() * vals.at(0).size()) + 1);
    std::cout << stats_standard_dev << std::endl;
}

Am I doing something horrible here?

EDIT

To respond to the comments, T would be an int.

EDIT 2

I fixed my std algorithm, and here is the final product:

void CalculateStats(const std::vector<double>& ignore_values)
{
    //OHGOD
    double new_mean = 0;
    double new_standard_dev = 0;

    int new_min = 256;
    int new_max = 0;

    size_t cnt = 0;

    int n = 0;
    double delta = 0.0;
    double mean2 = 0.0;

    std::vector<double>::const_iterator ignore_begin = ignore_values.begin();
    std::vector<double>::const_iterator ignore_end = ignore_values.end();

    for(std::vector<std::vector<T> >::const_iterator row = vals.begin(), row_end = vals.end();  row != row_end; ++row)
    {
        for(std::vector<T>::const_iterator col = row->begin(), col_end = row->end(); col != col_end; ++col)
        {
            // This method of calculation is based on Knuth's algorithm.
            T value = *col;
            if(std::find(ignore_begin, ignore_end, value) != ignore_end)
                continue;
            n++;
            delta = value - new_mean;
            new_mean = new_mean + (delta / n);
            mean2 = mean2 + (delta * (value - new_mean));

            // Find new max/min's.
            new_min = value < new_min ? value : new_min;
            new_max = value > new_max ? value : new_max;
        }
    }
    stats_standard_dev = mean2 / (n - 1);
    stats_min = new_min;
    stats_max = new_max;
    stats_mean = new_mean;

This still takes ~120-130 seconds to do this, but it's a huge improvement :)!

Upvotes: 5

Views: 1687

Answers (16)

Peter Mortensen
Peter Mortensen

Reputation: 31608

There are far too many calculations in the inner loop:

  1. For the descriptive statistics (mean, standard deviation) the only thing required is to compute the sum of value and the sum of squared value. From these two sums the mean and standard deviation can be computed after the outer loop (together with a third value, the number of samples - n is your new/updated code). The equations can be derived from the definitions or found on the web, e.g. Wikipedia. For instance the mean is just sum of value divided by n. For the n version (in contrast to the n-1 version - however n is large in this case so it doesn't matter which one is used) the standard deviation is:
    sqrt( n * sumOfSquaredValue - sumOfValue * sumOfValue).

    Thus only two floating point additions and one multiplication are needed in the inner loop. Overflow is not a problem with these sums as the range for doubles is 10^318. In particular you will get rid of the expensive floating point division that the profiling reported in another answer has revealed.

  2. A lesser problem is that the minimum and maximum are rewritten every time (the compiler may or may not prevent this). As the minimum quickly becomes small and the maximum quickly becomes large, only the two comparisons should happen for the majority of loop iterations: use if statements instead to be sure. It can be argued, but on the other hand it is trivial to do.

Upvotes: 3

Pete Kirkham
Pete Kirkham

Reputation: 49331

It's slow because you're benchmarking debug code.

Building and running the code on Windows XP using VS2008:

  • a Release build with the default optimisation level, the code in the OP runs in 2734 ms.
  • a Debug build with the default of no optimisation, the code in the OP runs in a massive 398,531 ms.

In comments below you say you're not using optimisation, and this appears to make a big difference in this case - normally it's less that a factor of ten, but in this case it's over a hundred times slower.

I'm using VS2008 rather than 2005, but it's probably similar:

In the Debug build, there are two range checks on each access, each of which calls std::vector::size() using a non-inlined function call and requires a branch predicition. There is overhead involved both with function calls and with branches.

In the Release build, the compiler optimizes away the range checks ( I don't know whether it just drops them, or does flow analysis based on the limits of the loop ), and the vector access becomes a small amount of inline pointer arithmetic with no branches.

No-one cares how fast the debug build is. You should be unit testing the release build anyway, as that's the build which has to work correctly. Only use the Debug build if you don't all the information you want if you try and step through the code.


The code as posted runs in < 1.5 seconds on my PC with test data of 15000 x 10000 integers all equal to 42. You report that it's running in 230 times slower that that. Are you on a 10 MHz processor?

Though there are other suggestions for making it faster ( such as moving it to use SSE, if all the values are representable using 8bit types ), but there's clearly something else which is making it slow.

On my machine, neither a version which hoisted a reference to the vector for the row and hoisting the size of the row, nor a version which used iterator had any measurable benefit ( with g++ -O3 using iterators takes 1511ms repeatably; the hoisted and original version both take 1485ms ). Not optimising means it runs in 7487ms ( original ), 3496ms ( hoisted ) or 5331ms ( iterators ).

But unless you're running on a very low power device, or are paging, or a running non-optimised code with a debugger attached, it shouldn't be this slow, and whatever is making it slow is not likely to be the code you've posted.

( as a side note, if you test it with values with a deviation of zero your SD comes out as 1 )

Upvotes: 3

Robert L
Robert L

Reputation: 1947

I have modified the algorithm to get rid of almost all of the floating-point division.

WARNING: UNTESTED CODE!!!

void CalculateStats()
{
    //OHGOD

    double accum_f;
    double accum_sq_f;
    double new_mean = 0;
    double new_standard_dev = 0;

    int new_min = 256;
    int new_max = 0;

    const int oku = 100000000;
    int accum_ichi = 0;
    int accum_oku = 0;
    int accum_sq_ichi = 0;
    int accum_sq_oku = 0;

    size_t cnt = 0;
    int v1 = 0;
    int v2 = 0;

    v1 = vals.size();

    for(size_t row = 0; row < v1; row++)
    {

            v2 = vals.at(row).size();
            for(size_t col = 0; col < v2; col++)
            {
                    T value = get(row, col);
                    int accum_ichi += value;
                    int accum_sq_ichi += (value * value);

                    // perform carries
                    accum_oku += (accum_ichi / oku);
                    accum_ichi %= oku;
                    accum_sq_oku += (accum_sq_ichi / oku);
                    accum_sq_ichi %= oku;

                    // find new max/min's
                    new_min = value < new_min ? value : new_min;
                    new_max = value > new_max ? value : new_max;
                    cnt++;
            }
    }

    // now, and only now, do we use floating-point arithmetic
    accum_f = (double)(oku) * (double)(accum_oku) + (double)(accum_ichi);
    accum_sq_f = (double)(oku) * (double)(accum_sq_oku) + (double)(accum_sq_ichi);

    new_mean = accum_f / (double)(cnt);

    // standard deviation formula from Wikipedia
    stats_standard_dev = sqrt((double)(cnt)*accum_sq_f - accum_f*accum_f)/(double)(cnt);        

    std::cout << stats_standard_dev << std::endl;
}

Upvotes: 0

Mike Dunlavey
Mike Dunlavey

Reputation: 40689

In the inner loop, you shouldn't be testing size, you shouldn't be doing any divisions, and iterators can also be costly. In fact, some unrolling would be good in there. And, of course, you should pay attention to cache locality.

If you get the loop overhead low enough, it might make sense to do it in separate passes: one to get the sum (which you divide to get the mean), one to get the sum of squares (which you combine with the sum to get the variance), and one to get the min and/or max. The reason is to simplify what is in the inner unrolled loop so the compiler can keep stuff in registers.

I couldn't get the code to compile, so I couldn't pinpoint issues for sure.

Upvotes: 0

user175727
user175727

Reputation:

Coming a bit late to the party here, but a couple of points:

  1. You're effectively doing numerical work here. I don't know much about numerical algorithms, but I know enough to know that references and expert support are often useful. This discussion thread offers some references; and Numerical Recipes is a standard (if dated) work.

  2. If you have the opportunity to redesign your matrix, you want to try using a valarray and slices instead of vectors of vectors; one advantage that immediately comes to mind is that you're guaranteed a flat linear layout, which makes cache pre-fetching and SIMD instructions (if your compiler can use them) more effective.

Upvotes: 0

Matt Price
Matt Price

Reputation: 45445

I would change how I access the data. Assuming you are using std::vector for your container you could do something like this:

vector<vector<T> >::const_iterator row;
vector<vector<T> >::const_iterator row_end = vals.end();
for(row = vals.begin(); row < row_end; ++row)
{
    vector<T>::const_iterator value;
    vector<T>::const_iterator value_end = row->end();
    for(value = row->begin(); value < value_end; ++value)
    {
        double mean_prev = new_mean;
        new_mean += (*value - new_mean) / (cnt + 1);
        new_standard_dev += (*value - new_mean) * (*value - mean_prev);

        // find new max/min's
        new_min = min(*value, new_min);
        new_max = max(*value, new_max);
        cnt++;
    }
}

The advantage of this is that in your inner loop you aren't consulting the outter vector, just the inner one.

If you container type is a list, this will be significantly faster. Because the look up time of get/operator[] is linear for a list and constant for a vector.

Edit, I moved the call to end() out of the loop.

Upvotes: 2

Andrew Bainbridge
Andrew Bainbridge

Reputation: 4808

I just profiled it. 90% of the execution time was in this line:

new_mean += (value - new_mean) / (cnt + 1);

Upvotes: 8

Michael Krelin - hacker
Michael Krelin - hacker

Reputation: 143269

First thing I spotted is that you evaluate vals.at(row).size() in the loop, which, obviously, isn't supposed to improve performance. It also applies to vals.size(), but of course inner loop is worse. If vals is a vector of vector, you better use iterators or at least keep reference for the outer vector (because get() with indices parameters surely eats up quite some time as well).

This code sample is supposed to illustrate my intentions ;-)

for(TVO::const_iterator i=vals.begin(),ie=vals.end();i!=ie;++i) {
    for(TVI::const_iterator ii=i->begin(),iie=i->end();ii!=iie;++ii) {
        T value = *ii;
        // the rest
    }
}

Upvotes: 5

unwind
unwind

Reputation: 400059

As people have mentioned, it might be get(). If it accesses neighbors, for instance, you will totally smash the cache which will greatly reduce the performance. You should profile, or just think about access patterns.

Upvotes: 0

Larry Watanabe
Larry Watanabe

Reputation: 10184

If your matrix is stored as a vector of vectors, then in the outer for loop you should directly retrieve the i-th vector, and then operate on that in the inner loop. Try that and see if it improves performance.

Upvotes: 1

Jim Buck
Jim Buck

Reputation: 20724

Move the .size() calls to before each loop, and make sure you are compiling with optimizations turned on.

Upvotes: 1

Zan Lynx
Zan Lynx

Reputation: 54353

I think that I would rewrite it to use const iterators instead of row and col indexes. I would set up a const const_iterator for row_end and col_end to compare against, just to make certain it wasn't making function calls at every loop end.

Upvotes: 0

Glen
Glen

Reputation: 22310

Have you tried to profile your code?

You don't even need a fancy profiler. Just stick some debug timing statements in there.

Anything I tell you would just be an educated guess (and probably wrong)

You could be getting lots of cache misses due to the way you're accessing the contents of the vector. You might want to cache some of the results to size() but I don't know if that's the issue.

Upvotes: 29

DVK
DVK

Reputation: 129529

You should calculate the sum of values, min, max and count in the first loop, then calculate the mean in one operation by dividing sum/count, then in a second loop calculate std_dev's sum

That would probably be a bit faster.

Upvotes: 7

Tobias
Tobias

Reputation: 5108

I'm nor sure of what type vals is but vals.at(row).size() could take a long time if itself iterates through the collection. Store that value in a variable. Otherwise it could make the algorithm more like O(n³) than O(n²)

Upvotes: 0

mmr
mmr

Reputation: 14929

  • First, change your row++ to ++row. A minor thing, but you want speed, so that will help
  • Second, make your row < vals.size into some const comparison instead. The compiler doesn't know that vals won't change, so it has to play nice and always call size.
  • what is the 'get' method in the middle there? What does that do? That might be your real problem.
  • I'm not too sure about your std dev calculation. Take a look at the wikipedia page on calculating variance in a single pass (they have a quick explanation of Knuth's algorithm, which is an expansion of a recursion relation).

Upvotes: 4

Related Questions