Jakub M.
Jakub M.

Reputation: 33867

Optimize this function (in C++)

I have a cpu-consuming code, where some function with a loop is executed many times. Every optimization in this loop brings noticeable performance gain. Question: How would you optimize this loop (there is not much more to optimize though...)?

void theloop(int64_t in[], int64_t out[], size_t N)
{
    for(uint32_t i = 0; i < N; i++) {
        int64_t v = in[i];
        max += v;
        if (v > max) max = v;
        out[i] = max;
    }
}

I tried a few things, e.g. I replaced arrays with pointers that were incremented in every loop, but (surprisingly) i lost some performance instead of gaining...

Edit:

Results:

I added some loop unfolding, and a nice hack from Alex'es post. Below I paste some results:

  1. original: 14.0s
  2. unfolded loop (4 iterations): 10.44s
  3. Alex'es trick: 10.89s
  4. 2) and 3) at once: 11.71s

strage, that 4) is not faster than 3) and 4). Below code for 4):

for(size_t i = 1; i < N; i+=CHUNK) {
    int64_t t_in0 = in[i+0];
    int64_t t_in1 = in[i+1];
    int64_t t_in2 = in[i+2];
    int64_t t_in3 = in[i+3];


    max &= -max >> 63;
    max += t_in0;
    out[i+0] = max;

    max &= -max >> 63;
    max += t_in1;
    out[i+1] = max;

    max &= -max >> 63;
    max += t_in2;
    out[i+2] = max;

    max &= -max >> 63;
    max += t_in3;
    out[i+3] = max;

}

Upvotes: 7

Views: 1277

Answers (8)

sehe
sehe

Reputation: 393759

> #**Announcement** see [chat](https://chat.stackoverflow.com/rooms/5056/discussion-between-sehe-and-jakub-m) > > _Hi Jakub, what would you say if I have found a version that uses a heuristic optimization that, for random data distributed uniformly will result in ~3.2x speed increase for `int64_t` (10.56x effective using `float`s)?_ > I have yet to find the time to update the post, but the explanation and code can be found through the chat.
> I used the same test-bed code (below) to verify that the results are correct and exactly match the original implementation from your OP
   **Edit**: ironically... that testbed had a fatal flaw, which rendered the results invalid: the heuristic version was in fact skipping parts of the input, but because existing output wasn't being cleared, it appeared to have the correct output... (still editing...)


Ok, I have published a benchmark based on your code versions, and also my proposed use of partial_sum.

Find all the code here https://gist.github.com/1368992#file_test.cpp

Features

For a default config of

#define MAGNITUDE     20
#define ITERATIONS    1024
#define VERIFICATION  1
#define VERBOSE       0

#define LIMITED_RANGE 0    // hide difference in output due to absense of overflows
#define USE_FLOATS    0

It will (see output fragment here):

  • run 100 x 1024 iterations (i.e. 100 different random seeds)
  • for data length 1048576 (2^20).
  • The random input data is uniformly distributed over the full range of the element data type (int64_t)
  • Verify output by generating a hash digest of the output array and comparing it to the reference implementation from the OP.

Results

There are a number of (surprising or unsurprising) results:

  1. there is no significant performance difference between any of the algorithms whatsoever (for integer data), provided you are compiling with optimizations enabled. (See Makefile; my arch is 64bit, Intel Core Q9550 with gcc-4.6.1)

  2. The algorithms are not equivalent (you'll see hash sums differ): notably the bit fiddle proposed by Alex doesn't handle integer overflow in quite the same way (this can be hidden defining

    #define LIMITED_RANGE 1
    

    which limits the input data so overflows won't occur; Note that the partial_sum_incorrect version shows equivalent C++ non-bitwise _arithmetic operations that yield the same different results:

    return max<0 ? v :  max + v; 
    

    Perhaps, it is ok for your purpose?)

  3. Surprisingly It is not more expensive to calculate both definitions of the max algorithm at once. You can see this being done inside partial_sum_correct: it calculates both 'formulations' of max in the same loop; This is really not more than a triva here, because none of the two methods is significantly faster...

  4. Even more surprisingly a big performance boost can be had when you are able to use float instead of int64_t. A quick and dirty hack can be applied to the benchmark

    #define USE_FLOATS    0
    

    showing that the STL based algorithm (partial_sum_incorrect) runs aproximately 2.5x faster when using float instead of int64_t (!!!).
    Note:

    • that the naming of partial_sum_incorrect only relates to integer overflow, which doesn't apply to floats; this can be seen from the fact that the hashes match up, so in fact it is partial_sum_float_correct :)
    • that the current implementation of partial_sum_correct is doing double work that causes it to perform badly in floating point mode. See bullet 3.
  5. (And there was that off-by-1 bug in the loop-unrolled version from the OP I mentioned before)

Partial sum

For your interest, the partial sum application looks like this in C++11:

std::partial_sum(data.begin(), data.end(), output.begin(), 
        [](int64_t max, int64_t v) -> int64_t
        { 
            max += v;
            if (v > max) max = v;
            return max;
        });

Upvotes: 7

paper.plane
paper.plane

Reputation: 1197

int64_t max = 0, i;

for(i=N-1; i > 0; --i) /* Comparing with 0 is faster */  
{  
    max = in[i] > 0 ? max+in[i] : in[i];
    out[i] = max;

    --i;  /* Will reduce checking of i>=0 by N/2 times */  

    max = in[i] > 0 ? max+in[i] : in[i]; /* Reduce operations v=in[i], max+=v by N times */  
    out[i] = max;         
}

if(0 == i) /* When N is odd */
{ 
    max = in[i] > 0 ? max+in[i] : in[i]; 
    out[i] = max;
}

Upvotes: -3

Matthieu M.
Matthieu M.

Reputation: 300229

Sometimes, you need to step backward and look over it again. The first question is obviously, do you need this ? Could there be an alternative algorithm that would perform better ?

That being said, and supposing for the sake of this question that you already settled on this algorithm, we can try and reason about what we actually have.

Disclaimer: the method I am describing is inspired by the successful method Tim Peters used to improve the traditional introsort implementation, leading to TimSort. So please bear with me ;)

1. Extracting Properties

The main issue I can see is the dependency between iterations, which will prevent much of the possible optimizations and thwart many attempts at parallelizing.

int64_t v = in[i];
max += v;
if (v > max) max = v;
out[i] = max;

Let us rework this code in a functional fashion:

max = calc(in[i], max);
out[i] = max;

Where:

int64_t calc(int64_t const in, int64_t const max) {
  int64_t const bumped = max + in;
  return in > bumped ? in : bumped;
}

Or rather, a simplified version (baring overflow since it's undefined):

int64_t calc(int64_t const in, int64_t const max) {
  return 0 > max ? in : max + in;
}

Do you notice the tip point ? The behavior changes depending on whether the ill-named(*) max is positive or negative.

This tipping point makes it interesting to watch the values in in more closely, especially according to the effect they might have on max:

  • max < 0 and in[i] < 0 then out[i] = in[i] < 0
  • max < 0 and in[i] > 0 then out[i] = in[i] > 0
  • max > 0 and in[i] < 0 then out[i] = (max + in[i]) ?? 0
  • max > 0 and in[i] > 0 then out[i] = (max + in[i]) > 0

(*) ill-named because it is also an accumulator, which the name hides. I have no better suggestion though.

2. Optimizing operations

This leads us to discover interesting cases:

  • if we have a slice [i, j) of the array containing only negative values (which we call negative slice), then we could do a std::copy(in + i, in + j, out + i) and max = out[j-1]
  • if we have a slice [i, j) of the array containing only positive values, then it's a pure accumulation code (which can easily be unrolled)
  • max gets positive as soon as in[i] is positive

Therefore, it could be interesting (but maybe not, I make no promise) to establish a profile of the input before actually working with it. Note that the profile could be made chunk by chunk for large inputs, for example tuning the chunk size based on the cache line size.

For references, the 3 routines:

void copy(int64_t const in[], int64_t out[],
          size_t const begin, size_t const end)
{
  std::copy(in + begin, in + end, out + begin);
} // copy

void accumulate(int64_t const in[], int64_t out[],
                size_t const begin, size_t const end)
{
  assert(begin != 0);

  int64_t max = out[begin-1];

  for (size_t i = begin; i != end; ++i) {
    max += in[i];
    out[i] = max;
  }
} // accumulate

void regular(int64_t const in[], int64_t out[],
             size_t const begin, size_t const end)
{
  assert(begin != 0);

  int64_t max = out[begin - 1];

  for (size_t i = begin; i != end; ++i)
  {
    max = 0 > max ? in[i] : max + in[i];
    out[i] = max;
  }
}

Now, supposing that we can somehow characterize the input using a simple structure:

struct Slice {
  enum class Type { Negative, Neutral, Positive };
  Type type;
  size_t begin;
  size_t end;
};

typedef void (*Func)(int64_t const[], int64_t[], size_t, size_t);

Func select(Type t) {
  switch(t) {
  case Type::Negative: return &copy;
  case Type::Neutral: return &regular;
  case Type::Positive: return &accumulate;
  }
}

void theLoop(std::vector<Slice> const& slices, int64_t const in[], int64_t out[]) {
  for (Slice const& slice: slices) {
    Func const f = select(slice.type);
    (*f)(in, out, slice.begin, slice.end);
  }
}

Now, unless introsort the work in the loop is minimal, so computing the characteristics might be too costly as is... however it leads itself well to parallelization.

3. Simple parallelization

Note that the characterization is a pure function of the input. Therefore, supposing that you work in a chunk per chunk fashion, it could be possible to have, in parallel:

  • Slice Producer: a characterizer thread, which computes the Slice::Type value
  • Slice Consumer: a worker thread, which actually executes the code

Even if the input is essentially random, providing the chunk is small enough (for example, a CPU L1 cache line) there might be chunks for which it does work. Synchronization between the two threads can be done with a simple thread-safe queue of Slice (producer/consumer) and adding a bool last attribute to stop consumption or by creating the Slice in a vector with a Unknown type, and having the consumer block until it's known (using atomics).

Note: because characterization is pure, it's embarrassingly parallel.

4. More Parallelization: Speculative work

Remember this innocent remark: max gets positive as soon as in[i] is positive.

Suppose that we can guess (reliably) that the Slice[j-1] will produce a max value that is negative, then the computation on Slice[j] are independent of what preceded them, and we can start the work right now!

Of course, it's a guess, so we might be wrong... but once we have fully characterized all the Slices, we have idle cores, so we might as well use them for speculative work! And if we're wrong ? Well, the Consumer thread will simply gently erase our mistake and replace it with the correct value.

The heuristic to speculatively compute a Slice should be simple, and it will have to be tuned. It may be adaptative as well... but that may be more difficult!

Conclusion

Analyze your dataset and try to find if it's possible to break dependencies. If it is you can probably take advantage of it, even without going multi-thread.

Upvotes: 5

Stack Overflow is garbage
Stack Overflow is garbage

Reputation: 248219

First, you need to look at the generated assembly. Otherwise you have no way of knowing what actually happens when this loop is executed.

Now: is this code running on a 64-bit machine? If not, those 64-bit additions might hurt a bit.

This loop seems an obvious candidate for using SIMD instructions. SSE2 supports a number of SIMD instructions for integer arithmetics, including some that work on two 64-bit values.

Other than that, see if the compiler properly unrolls the loop, and if not, do so yourself. Unroll a couple of iterations of the loop, and then reorder the hell out of it. Put all the memory loads at the top of the loop, so they can be started as early as possible.

For the if line, check that the compiler is generating a conditional move, rather than a branch.

Finally, see if your compiler supports something like the restrict/__restrict keyword. It's not standard in C++, but it is very useful for indicating to the compiler that in and out do not point to the same addresses.

Is the size (N) known at compile-time? If so, make it a template parameter (and then try passing in and out as references to properly-sized arrays, as this may also help the compiler with aliasing analysis)

Just some thoughts off the top of my head. But again, study the disassembly. You need to know what the compiler does for you, and especially, what it doesn't do for you.

Edit

with your edit:

max &= -max >> 63;
max += t_in0;
out[i+0] = max;

what strikes me is that you added a huge dependency chain. Before the result can be computed, max must be negated, the result must be shifted, the result of that must be and'ed together with its original value, and the result of that must be added to another variable.

In other words, all these operations have to be serialized. You can't start one of them before the previous has finished. That's not necessarily a speedup. Modern pipelined out-of-order CPUs like to execute lots of things in parallel. Tying it up with a single long chain of dependant instructions is one of the most crippling things you can do. (Of course, it if can be interleaved with other iterations, it might work out better. But my gut feeling is that a simple conditional move instruction would be preferable)

Upvotes: 15

Alexey Frunze
Alexey Frunze

Reputation: 62106

If values of max and in[] are far away from 64-bit min/max (say, they are always between -261 and +261), you may try a loop without the conditional branch, which may be causing some perf degradation:

for(uint32_t i = 1; i < N; i++) {
    max &= -max >> 63; // assuming >> would do arithmetic shift with sign extension
    max += in[i];
    out[i] = max;
}

In theory the compiler may do a similar trick as well, but without seeing the disassembly, it's hard to tell if it does it.

Upvotes: 4

Will
Will

Reputation: 75673

ensuring the method isn't virtual, inline, _attribute_((always_inline)) and -funroll-loops seem like good options to explore.

Only by you benchmarking them can we determine if they were worthwhile optimizations in your bigger program.

Upvotes: 1

Francesco
Francesco

Reputation: 3250

The code appears already pretty fast. Depending on the nature of the in array, you could try special casing, for instance if you happen to know that in a particular invokation all the input numbers are positive, out[i] will be equal to the cumulative sum, with no need for an if branch.

Upvotes: 1

SmacL
SmacL

Reputation: 22932

The only thing that comes to mind that might help a small bit is to use pointers rather than array indices within your loop, something like

void theloop(int64_t in[], int64_t out[], size_t N)
{
    int64_t max = in[0];
    out[0] = max;
    int64_t *ip = in + 1,*op = out+1;

    for(uint32_t i = 1; i < N; i++) {
        int64_t v = *ip; 
        ip++;
        max += v;
        if (v > max) max = v;
        *op = max;
        op++
    }
}

The thinking here is that an index into an array is liable to compile as taking the base address of the array, multiplying the size of element by the index, and adding the result to get the element address. Keeping running pointers avoids this. I'm guessing a good optimizing compiler will do this already, so you'd need to study the current assembler output.

Upvotes: -1

Related Questions