Reputation: 33867
I have a cpu-consuming code, where some function with a loop is executed many times. Every optimization in this loop brings noticeable performance gain. Question: How would you optimize this loop (there is not much more to optimize though...)?
void theloop(int64_t in[], int64_t out[], size_t N)
{
for(uint32_t i = 0; i < N; i++) {
int64_t v = in[i];
max += v;
if (v > max) max = v;
out[i] = max;
}
}
I tried a few things, e.g. I replaced arrays with pointers that were incremented in every loop, but (surprisingly) i lost some performance instead of gaining...
Edit:
itsMaximums
, error)int64_t
, so are negative and positiveN
is unknown at compile time_m128i
, executing and storing back was higher than than SSE speed gain. Yet I am not an expert on SSE, so maybe I had a poor code)Results:
I added some loop unfolding, and a nice hack from Alex'es post. Below I paste some results:
strage, that 4) is not faster than 3) and 4). Below code for 4):
for(size_t i = 1; i < N; i+=CHUNK) {
int64_t t_in0 = in[i+0];
int64_t t_in1 = in[i+1];
int64_t t_in2 = in[i+2];
int64_t t_in3 = in[i+3];
max &= -max >> 63;
max += t_in0;
out[i+0] = max;
max &= -max >> 63;
max += t_in1;
out[i+1] = max;
max &= -max >> 63;
max += t_in2;
out[i+2] = max;
max &= -max >> 63;
max += t_in3;
out[i+3] = max;
}
Upvotes: 7
Views: 1277
Reputation: 393759
> #**Announcement** see [chat](https://chat.stackoverflow.com/rooms/5056/discussion-between-sehe-and-jakub-m)
> > _Hi Jakub, what would you say if I have found a version that uses a heuristic optimization that, for random data distributed uniformly will result in ~3.2x speed increase for `int64_t` (10.56x effective using `float`s)?_
>
I have yet to find the time to update the post, but the explanation and code can be found through the chat. **Edit**: ironically... that testbed had a fatal flaw, which rendered the results invalid: the heuristic version was in fact skipping parts of the input, but because existing output wasn't being cleared, it appeared to have the correct output... (still editing...)
> I used the same test-bed code (below) to verify that the results are correct and exactly match the original implementation from your OP
Ok, I have published a benchmark based on your code versions, and also my proposed use of partial_sum
.
Find all the code here https://gist.github.com/1368992#file_test.cpp
For a default config of
#define MAGNITUDE 20
#define ITERATIONS 1024
#define VERIFICATION 1
#define VERBOSE 0
#define LIMITED_RANGE 0 // hide difference in output due to absense of overflows
#define USE_FLOATS 0
It will (see output fragment here):
int64_t
)There are a number of (surprising or unsurprising) results:
there is no significant performance difference between any of the algorithms whatsoever (for integer data), provided you are compiling with optimizations enabled. (See Makefile; my arch is 64bit, Intel Core Q9550 with gcc-4.6.1)
The algorithms are not equivalent (you'll see hash sums differ): notably the bit fiddle proposed by Alex doesn't handle integer overflow in quite the same way (this can be hidden defining
#define LIMITED_RANGE 1
which limits the input data so overflows won't occur; Note that the partial_sum_incorrect
version shows equivalent C++ non-bitwise _arithmetic operations that yield the same different results:
return max<0 ? v : max + v;
Perhaps, it is ok for your purpose?)
Surprisingly It is not more expensive to calculate both definitions of the max algorithm at once. You can see this being done inside partial_sum_correct
: it calculates both 'formulations' of max in the same loop; This is really not more than a triva here, because none of the two methods is significantly faster...
Even more surprisingly a big performance boost can be had when you are able to use float
instead of int64_t
. A quick and dirty hack can be applied to the benchmark
#define USE_FLOATS 0
showing that the STL based algorithm (partial_sum_incorrect
) runs aproximately 2.5x faster when using float
instead of int64_t
(!!!).
Note:
partial_sum_incorrect
only relates to integer overflow, which doesn't apply to floats; this can be seen from the fact that the hashes match up, so in fact it is partial_sum_float_correct :)partial_sum_correct
is doing double work that causes it to perform badly in floating point mode. See bullet 3.(And there was that off-by-1 bug in the loop-unrolled version from the OP I mentioned before)
For your interest, the partial sum application looks like this in C++11:
std::partial_sum(data.begin(), data.end(), output.begin(),
[](int64_t max, int64_t v) -> int64_t
{
max += v;
if (v > max) max = v;
return max;
});
Upvotes: 7
Reputation: 1197
int64_t max = 0, i;
for(i=N-1; i > 0; --i) /* Comparing with 0 is faster */
{
max = in[i] > 0 ? max+in[i] : in[i];
out[i] = max;
--i; /* Will reduce checking of i>=0 by N/2 times */
max = in[i] > 0 ? max+in[i] : in[i]; /* Reduce operations v=in[i], max+=v by N times */
out[i] = max;
}
if(0 == i) /* When N is odd */
{
max = in[i] > 0 ? max+in[i] : in[i];
out[i] = max;
}
Upvotes: -3
Reputation: 300229
Sometimes, you need to step backward and look over it again. The first question is obviously, do you need this ? Could there be an alternative algorithm that would perform better ?
That being said, and supposing for the sake of this question that you already settled on this algorithm, we can try and reason about what we actually have.
Disclaimer: the method I am describing is inspired by the successful method Tim Peters used to improve the traditional introsort implementation, leading to TimSort. So please bear with me ;)
1. Extracting Properties
The main issue I can see is the dependency between iterations, which will prevent much of the possible optimizations and thwart many attempts at parallelizing.
int64_t v = in[i];
max += v;
if (v > max) max = v;
out[i] = max;
Let us rework this code in a functional fashion:
max = calc(in[i], max);
out[i] = max;
Where:
int64_t calc(int64_t const in, int64_t const max) {
int64_t const bumped = max + in;
return in > bumped ? in : bumped;
}
Or rather, a simplified version (baring overflow since it's undefined):
int64_t calc(int64_t const in, int64_t const max) {
return 0 > max ? in : max + in;
}
Do you notice the tip point ? The behavior changes depending on whether the ill-named(*) max
is positive or negative.
This tipping point makes it interesting to watch the values in in
more closely, especially according to the effect they might have on max
:
max < 0
and in[i] < 0
then out[i] = in[i] < 0
max < 0
and in[i] > 0
then out[i] = in[i] > 0
max > 0
and in[i] < 0
then out[i] = (max + in[i]) ?? 0
max > 0
and in[i] > 0
then out[i] = (max + in[i]) > 0
(*) ill-named because it is also an accumulator, which the name hides. I have no better suggestion though.
2. Optimizing operations
This leads us to discover interesting cases:
[i, j)
of the array containing only negative values (which we call negative slice), then we could do a std::copy(in + i, in + j, out + i)
and max = out[j-1]
[i, j)
of the array containing only positive values, then it's a pure accumulation code (which can easily be unrolled)max
gets positive as soon as in[i]
is positiveTherefore, it could be interesting (but maybe not, I make no promise) to establish a profile of the input before actually working with it. Note that the profile could be made chunk by chunk for large inputs, for example tuning the chunk size based on the cache line size.
For references, the 3 routines:
void copy(int64_t const in[], int64_t out[],
size_t const begin, size_t const end)
{
std::copy(in + begin, in + end, out + begin);
} // copy
void accumulate(int64_t const in[], int64_t out[],
size_t const begin, size_t const end)
{
assert(begin != 0);
int64_t max = out[begin-1];
for (size_t i = begin; i != end; ++i) {
max += in[i];
out[i] = max;
}
} // accumulate
void regular(int64_t const in[], int64_t out[],
size_t const begin, size_t const end)
{
assert(begin != 0);
int64_t max = out[begin - 1];
for (size_t i = begin; i != end; ++i)
{
max = 0 > max ? in[i] : max + in[i];
out[i] = max;
}
}
Now, supposing that we can somehow characterize the input using a simple structure:
struct Slice {
enum class Type { Negative, Neutral, Positive };
Type type;
size_t begin;
size_t end;
};
typedef void (*Func)(int64_t const[], int64_t[], size_t, size_t);
Func select(Type t) {
switch(t) {
case Type::Negative: return ©
case Type::Neutral: return ®ular;
case Type::Positive: return &accumulate;
}
}
void theLoop(std::vector<Slice> const& slices, int64_t const in[], int64_t out[]) {
for (Slice const& slice: slices) {
Func const f = select(slice.type);
(*f)(in, out, slice.begin, slice.end);
}
}
Now, unless introsort the work in the loop is minimal, so computing the characteristics might be too costly as is... however it leads itself well to parallelization.
3. Simple parallelization
Note that the characterization is a pure function of the input. Therefore, supposing that you work in a chunk per chunk fashion, it could be possible to have, in parallel:
Slice::Type
valueEven if the input is essentially random, providing the chunk is small enough (for example, a CPU L1 cache line) there might be chunks for which it does work. Synchronization between the two threads can be done with a simple thread-safe queue of Slice
(producer/consumer) and adding a bool last
attribute to stop consumption or by creating the Slice
in a vector with a Unknown
type, and having the consumer block until it's known (using atomics).
Note: because characterization is pure, it's embarrassingly parallel.
4. More Parallelization: Speculative work
Remember this innocent remark: max
gets positive as soon as in[i]
is positive.
Suppose that we can guess (reliably) that the Slice[j-1]
will produce a max
value that is negative, then the computation on Slice[j]
are independent of what preceded them, and we can start the work right now!
Of course, it's a guess, so we might be wrong... but once we have fully characterized all the Slices, we have idle cores, so we might as well use them for speculative work! And if we're wrong ? Well, the Consumer thread will simply gently erase our mistake and replace it with the correct value.
The heuristic to speculatively compute a Slice
should be simple, and it will have to be tuned. It may be adaptative as well... but that may be more difficult!
Conclusion
Analyze your dataset and try to find if it's possible to break dependencies. If it is you can probably take advantage of it, even without going multi-thread.
Upvotes: 5
Reputation: 248219
First, you need to look at the generated assembly. Otherwise you have no way of knowing what actually happens when this loop is executed.
Now: is this code running on a 64-bit machine? If not, those 64-bit additions might hurt a bit.
This loop seems an obvious candidate for using SIMD instructions. SSE2 supports a number of SIMD instructions for integer arithmetics, including some that work on two 64-bit values.
Other than that, see if the compiler properly unrolls the loop, and if not, do so yourself. Unroll a couple of iterations of the loop, and then reorder the hell out of it. Put all the memory loads at the top of the loop, so they can be started as early as possible.
For the if
line, check that the compiler is generating a conditional move, rather than a branch.
Finally, see if your compiler supports something like the restrict
/__restrict
keyword. It's not standard in C++, but it is very useful for indicating to the compiler that in
and out
do not point to the same addresses.
Is the size (N
) known at compile-time? If so, make it a template parameter (and then try passing in
and out
as references to properly-sized arrays, as this may also help the compiler with aliasing analysis)
Just some thoughts off the top of my head. But again, study the disassembly. You need to know what the compiler does for you, and especially, what it doesn't do for you.
Edit
with your edit:
max &= -max >> 63;
max += t_in0;
out[i+0] = max;
what strikes me is that you added a huge dependency chain. Before the result can be computed, max must be negated, the result must be shifted, the result of that must be and'ed together with its original value, and the result of that must be added to another variable.
In other words, all these operations have to be serialized. You can't start one of them before the previous has finished. That's not necessarily a speedup. Modern pipelined out-of-order CPUs like to execute lots of things in parallel. Tying it up with a single long chain of dependant instructions is one of the most crippling things you can do. (Of course, it if can be interleaved with other iterations, it might work out better. But my gut feeling is that a simple conditional move instruction would be preferable)
Upvotes: 15
Reputation: 62106
If values of max
and in[]
are far away from 64-bit min/max (say, they are always between -261 and +261), you may try a loop without the conditional branch, which may be causing some perf degradation:
for(uint32_t i = 1; i < N; i++) {
max &= -max >> 63; // assuming >> would do arithmetic shift with sign extension
max += in[i];
out[i] = max;
}
In theory the compiler may do a similar trick as well, but without seeing the disassembly, it's hard to tell if it does it.
Upvotes: 4
Reputation: 75673
ensuring the method isn't virtual, inline, _attribute_((always_inline)) and -funroll-loops seem like good options to explore.
Only by you benchmarking them can we determine if they were worthwhile optimizations in your bigger program.
Upvotes: 1
Reputation: 3250
The code appears already pretty fast. Depending on the nature of the in array, you could try special casing, for instance if you happen to know that in a particular invokation all the input numbers are positive, out[i] will be equal to the cumulative sum, with no need for an if branch.
Upvotes: 1
Reputation: 22932
The only thing that comes to mind that might help a small bit is to use pointers rather than array indices within your loop, something like
void theloop(int64_t in[], int64_t out[], size_t N)
{
int64_t max = in[0];
out[0] = max;
int64_t *ip = in + 1,*op = out+1;
for(uint32_t i = 1; i < N; i++) {
int64_t v = *ip;
ip++;
max += v;
if (v > max) max = v;
*op = max;
op++
}
}
The thinking here is that an index into an array is liable to compile as taking the base address of the array, multiplying the size of element by the index, and adding the result to get the element address. Keeping running pointers avoids this. I'm guessing a good optimizing compiler will do this already, so you'd need to study the current assembler output.
Upvotes: -1