Reputation: 11399
I would like to ask if there is a quicker way to do my audio conversion than by iterating through all values one by one and dividing them through 32768.
void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
for ( int i = 0; i <=uIntegers.size()-1;i++)
{
uDoubles[i] = uIntegers[i] / 32768.0;
}
}
My approach works fine, but it could be quicker. However I did not find any way to speed it up.
Thank you for the help!
Upvotes: 12
Views: 799
Reputation:
Your function is highly parallelizable. On modern Intel CPU there are three independent ways to parallelize: Instruction level parallelism (ILP), thread level parallelism (TLP), and SIMD. I was able to use all three to get big boosts in your function. The results are compiler dependent though. The boost is much less using GCC since it already vectorizes the function. See the table of numbers below.
However, the main limiting factor in your function is that it's time complexity is only O(n) and so there is a drastic drop in efficiency when the size of the array you're running over crosses each cache level boundary. If you look at for example at large dense matrix multiplication (GEMM) it's a O(n^3) operation so if one does things right (using e.g. loop tiling) the cache hierarchy is not a problem: you can get close to the maximum flops/s even for very large matrices (which seems to indicate that GEMM is one of the thinks Intel thinks of when they design the CPU). The way to fix this in your case is to find a way to do your function on a L1 cache block right after/before you do a more complex operation (for example that goes as O(n^2)) and then move to another L1 block. Of course I don't know what you're doing so I can't do that.
ILP is partially done for you by the CPU hardware. However, often carried loop dependencies limit the ILP so it often helps to do loop unrolling to take full advantage of the ILP. For TLP I use OpenMP, and for SIMD I used AVX (however the code below works for SSE as well). I used 32 byte aligned memory and made sure the array was a multiple of 8 so that no clean up was necessary.
Here are the results from Visual Studio 2012 64bit with AVX and OpenMP (release mode obviously) SandyBridge EP 4 cores (8 HW threads) @3.6 GHz. The variable n
is the number of items. I repeat the function several times as well so the total time includes that. The function convert_vec4_unroll2_openmp
gives the best results except in the L1 region. You can also cleary see that the efficiency drops significantly each time you move to a new cache level but even for main memory it's still better.
l1 chache, n 2752, repeat 300000
covert time 1.34, error 0.000000
convert_vec4 time 0.16, error 0.000000
convert_vec4_unroll2 time 0.16, error 0.000000
convert_vec4_unroll2_openmp time 0.31, error 0.000000
l2 chache, n 21856, repeat 30000
covert time 1.14, error 0.000000
convert_vec4 time 0.24, error 0.000000
convert_vec4_unroll2 time 0.24, error 0.000000
convert_vec4_unroll2_openmp time 0.12, error 0.000000
l3 chache, n 699072, repeat 1000
covert time 1.23, error 0.000000
convert_vec4 time 0.44, error 0.000000
convert_vec4_unroll2 time 0.45, error 0.000000
convert_vec4_unroll2_openmp time 0.14, error 0.000000
main memory , n 8738144, repeat 100
covert time 1.56, error 0.000000
convert_vec4 time 0.95, error 0.000000
convert_vec4_unroll2 time 0.89, error 0.000000
convert_vec4_unroll2_openmp time 0.51, error 0.000000
Results with g++ foo.cpp -mavx -fopenmp -ffast-math -O3
on a i5-3317 (ivy bridge) @ 2.4 GHz 2 cores (4 HW threads). GCC seems to vectorize this and the only benefit comes from OpenMP (which, however, gives a worse result in the L1 region).
l1 chache, n 2752, repeat 300000
covert time 0.26, error 0.000000
convert_vec4 time 0.22, error 0.000000
convert_vec4_unroll2 time 0.21, error 0.000000
convert_vec4_unroll2_openmp time 0.46, error 0.000000
l2 chache, n 21856, repeat 30000
covert time 0.28, error 0.000000
convert_vec4 time 0.27, error 0.000000
convert_vec4_unroll2 time 0.27, error 0.000000
convert_vec4_unroll2_openmp time 0.20, error 0.000000
l3 chache, n 699072, repeat 1000
covert time 0.80, error 0.000000
convert_vec4 time 0.80, error 0.000000
convert_vec4_unroll2 time 0.80, error 0.000000
convert_vec4_unroll2_openmp time 0.83, error 0.000000
main memory chache, n 8738144, repeat 100
covert time 1.10, error 0.000000
convert_vec4 time 1.10, error 0.000000
convert_vec4_unroll2 time 1.10, error 0.000000
convert_vec4_unroll2_openmp time 1.00, error 0.000000
Here is the code. I use the vectorclass http://www.agner.org/optimize/vectorclass.zip to do SIMD. This will use either AVX to write 4 doubles at once or SSE to write 2 doubles at once.
#include <stdlib.h>
#include <stdio.h>
#include <omp.h>
#include "vectorclass.h"
void convert(const int *uIntegers, double *uDoubles, const int n) {
for ( int i = 0; i<n; i++) {
uDoubles[i] = uIntegers[i] / 32768.0;
}
}
void convert_vec4(const int *uIntegers, double *uDoubles, const int n) {
Vec4d div = 1.0/32768;
for ( int i = 0; i<n; i+=4) {
Vec4i u4i = Vec4i().load(&uIntegers[i]);
Vec4d u4d = to_double(u4i);
u4d*=div;
u4d.store(&uDoubles[i]);
}
}
void convert_vec4_unroll2(const int *uIntegers, double *uDoubles, const int n) {
Vec4d div = 1.0/32768;
for ( int i = 0; i<n; i+=8) {
Vec4i u4i_v1 = Vec4i().load(&uIntegers[i]);
Vec4d u4d_v1 = to_double(u4i_v1);
u4d_v1*=div;
u4d_v1.store(&uDoubles[i]);
Vec4i u4i_v2 = Vec4i().load(&uIntegers[i+4]);
Vec4d u4d_v2 = to_double(u4i_v2);
u4d_v2*=div;
u4d_v2.store(&uDoubles[i+4]);
}
}
void convert_vec4_openmp(const int *uIntegers, double *uDoubles, const int n) {
#pragma omp parallel for
for ( int i = 0; i<n; i+=4) {
Vec4i u4i = Vec4i().load(&uIntegers[i]);
Vec4d u4d = to_double(u4i);
u4d/=32768.0;
u4d.store(&uDoubles[i]);
}
}
void convert_vec4_unroll2_openmp(const int *uIntegers, double *uDoubles, const int n) {
Vec4d div = 1.0/32768;
#pragma omp parallel for
for ( int i = 0; i<n; i+=8) {
Vec4i u4i_v1 = Vec4i().load(&uIntegers[i]);
Vec4d u4d_v1 = to_double(u4i_v1);
u4d_v1*=div;
u4d_v1.store(&uDoubles[i]);
Vec4i u4i_v2 = Vec4i().load(&uIntegers[i+4]);
Vec4d u4d_v2 = to_double(u4i_v2);
u4d_v2*=div;
u4d_v2.store(&uDoubles[i+4]);
}
}
double compare(double *a, double *b, const int n) {
double diff = 0.0;
for(int i=0; i<n; i++) {
double tmp = a[i] - b[i];
//printf("%d %f %f \n", i, a[i], b[i]);
if(tmp<0) tmp*=-1;
diff += tmp;
}
return diff;
}
void loop(const int n, const int repeat, const int ifunc) {
void (*fp[4])(const int *uIntegers, double *uDoubles, const int n);
int *a = (int*)_mm_malloc(sizeof(int)* n, 32);
double *b1_cmp = (double*)_mm_malloc(sizeof(double)*n, 32);
double *b1 = (double*)_mm_malloc(sizeof(double)*n, 32);
double dtime;
const char *fp_str[] = {
"covert",
"convert_vec4",
"convert_vec4_unroll2",
"convert_vec4_unroll2_openmp",
};
for(int i=0; i<n; i++) {
a[i] = rand()*RAND_MAX;
}
fp[0] = convert;
fp[1] = convert_vec4;
fp[2] = convert_vec4_unroll2;
fp[3] = convert_vec4_unroll2_openmp;
convert(a, b1_cmp, n);
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) {
fp[ifunc](a, b1, n);
}
dtime = omp_get_wtime() - dtime;
printf("\t%s time %.2f, error %f\n", fp_str[ifunc], dtime, compare(b1_cmp,b1,n));
_mm_free(a);
_mm_free(b1_cmp);
_mm_free(b1);
}
int main() {
double dtime;
int l1 = (32*1024)/(sizeof(int) + sizeof(double));
int l2 = (256*1024)/(sizeof(int) + sizeof(double));
int l3 = (8*1024*1024)/(sizeof(int) + sizeof(double));
int lx = (100*1024*1024)/(sizeof(int) + sizeof(double));
int n[] = {l1, l2, l3, lx};
int repeat[] = {300000, 30000, 1000, 100};
const char *cache_str[] = {"l1", "l2", "l3", "main memory"};
for(int c=0; c<4; c++ ) {
int lda = ((n[c]+7) & -8); //make sure array is a multiple of 8
printf("%s chache, n %d\n", cache_str[c], lda);
for(int i=0; i<4; i++) {
loop(lda, repeat[c], i);
} printf("\n");
}
}
Lastly, anyone who has read this far and feels like reminding me that my code looks more like C than C++ please read this first before you decide to comment http://www.stroustrup.com/sibling_rivalry.pdf
Upvotes: 3
Reputation: 18227
Edit: See Adam's answer above for a version using SSE intrinsics. Better than what I had here ...
To make this more useful, let's look at compiler-generated code here. I'm using gcc 4.8.0 and yes, it is worth checking your specific compiler (version) as there are quite significant differences in output for, say, gcc 4.4, 4.8, clang 3.2 or Intel's icc.
Your original, using g++ -O8 -msse4.2 ...
translates into the following loop:
.L2:
cvtsi2sd (%rcx,%rax,4), %xmm0
mulsd %xmm1, %xmm0
addl $1, %edx
movsd %xmm0, (%rsi,%rax,8)
movslq %edx, %rax
cmpq %rdi, %rax
jbe .L2
where %xmm1
holds 1.0/32768.0
so the compiler automatically turns the division into multiplication-by-reverse.
On the other hand, using g++ -msse4.2 -O8 -funroll-loops ...
, the code created for the loop changes significantly:
[ ... ]
leaq -1(%rax), %rdi
movq %rdi, %r8
andl $7, %r8d
je .L3
[ ... insert a duff's device here, up to 6 * 2 conversions ... ]
jmp .L3
.p2align 4,,10
.p2align 3
.L39:
leaq 2(%rsi), %r11
cvtsi2sd (%rdx,%r10,4), %xmm9
mulsd %xmm0, %xmm9
leaq 5(%rsi), %r9
leaq 3(%rsi), %rax
leaq 4(%rsi), %r8
cvtsi2sd (%rdx,%r11,4), %xmm10
mulsd %xmm0, %xmm10
cvtsi2sd (%rdx,%rax,4), %xmm11
cvtsi2sd (%rdx,%r8,4), %xmm12
cvtsi2sd (%rdx,%r9,4), %xmm13
movsd %xmm9, (%rcx,%r10,8)
leaq 6(%rsi), %r10
mulsd %xmm0, %xmm11
mulsd %xmm0, %xmm12
movsd %xmm10, (%rcx,%r11,8)
leaq 7(%rsi), %r11
mulsd %xmm0, %xmm13
cvtsi2sd (%rdx,%r10,4), %xmm14
mulsd %xmm0, %xmm14
cvtsi2sd (%rdx,%r11,4), %xmm15
mulsd %xmm0, %xmm15
movsd %xmm11, (%rcx,%rax,8)
movsd %xmm12, (%rcx,%r8,8)
movsd %xmm13, (%rcx,%r9,8)
leaq 8(%rsi), %r9
movsd %xmm14, (%rcx,%r10,8)
movsd %xmm15, (%rcx,%r11,8)
movq %r9, %rsi
.L3:
cvtsi2sd (%rdx,%r9,4), %xmm8
mulsd %xmm0, %xmm8
leaq 1(%rsi), %r10
cmpq %rdi, %r10
movsd %xmm8, (%rcx,%r9,8)
jbe .L39
[ ... out ... ]
So it blocks the operations up, but still converts one-value-at-a-time.
If you change your original loop to operate on a few elements per iteration:
size_t i;
for (i = 0; i < uIntegers.size() - 3; i += 4)
{
uDoubles[i] = uIntegers[i] / 32768.0;
uDoubles[i+1] = uIntegers[i+1] / 32768.0;
uDoubles[i+2] = uIntegers[i+2] / 32768.0;
uDoubles[i+3] = uIntegers[i+3] / 32768.0;
}
for (; i < uIntegers.size(); i++)
uDoubles[i] = uIntegers[i] / 32768.0;
the compiler, gcc -msse4.2 -O8 ...
(i.e. even without requesting unrolling), identifies the potential to use CVTDQ2PD
/MULPD
and the core of the loop becomes:
.p2align 4,,10
.p2align 3
.L4:
movdqu (%rcx), %xmm0
addq $16, %rcx
cvtdq2pd %xmm0, %xmm1
pshufd $238, %xmm0, %xmm0
mulpd %xmm2, %xmm1
cvtdq2pd %xmm0, %xmm0
mulpd %xmm2, %xmm0
movlpd %xmm1, (%rdx,%rax,8)
movhpd %xmm1, 8(%rdx,%rax,8)
movlpd %xmm0, 16(%rdx,%rax,8)
movhpd %xmm0, 24(%rdx,%rax,8)
addq $4, %rax
cmpq %r8, %rax
jb .L4
cmpq %rdi, %rax
jae .L29
[ ... duff's device style for the "tail" ... ]
.L29:
rep ret
I.e. now the compiler recognizes the opportunity to put two double
per SSE register, and do parallel multiply / conversion. This is pretty close to the code that Adam's SSE intrinsics version would generate.
The code in total (I've shown only about 1/6th of it) is much more complex than the "direct" intrinsics, due to the fact that, as mentioned, the compiler tries to prepend/append unaligned / not-block-multiple "heads" and "tails" to the loop. It largely depends on the average/expected sizes of your vectors whether this will be beneficial or not; for the "generic" case (vectors more than twice the size of the block processed by the "innermost" loop), it'll help.
The result of this exercise is, largely ... that, if you coerce (by compiler options/optimization) or hint (by slightly rearranging the code) your compiler to do the right thing, then for this specific kind of copy/convert loop, it comes up with code that's not going to be much behind hand-written intrinsics.
Final experiment ... make the code:
static double c(int x) { return x / 32768.0; }
void Convert(const std::vector<int>& uIntegers, std::vector<double>& uDoubles)
{
std::transform(uIntegers.begin(), uIntegers.end(), uDoubles.begin(), c);
}
and (for the nicest-to-read assembly output, this time using gcc 4.4 with gcc -O8 -msse4.2 ...
) the generated assembly core loop (again, there's a pre/post bit) becomes:
.p2align 4,,10
.p2align 3
.L8:
movdqu (%r9,%rax), %xmm0
addq $1, %rcx
cvtdq2pd %xmm0, %xmm1
pshufd $238, %xmm0, %xmm0
mulpd %xmm2, %xmm1
cvtdq2pd %xmm0, %xmm0
mulpd %xmm2, %xmm0
movapd %xmm1, (%rsi,%rax,2)
movapd %xmm0, 16(%rsi,%rax,2)
addq $16, %rax
cmpq %rcx, %rdi
ja .L8
cmpq %rbx, %rbp
leaq (%r11,%rbx,4), %r11
leaq (%rdx,%rbx,8), %rdx
je .L10
[ ... ]
.L10:
[ ... ]
ret
With that, what do we learn ? If you want to use C++, really use C++ ;-)
Upvotes: 1
Reputation: 952
For maximum speed you want to convert more than one value per loop iteration. The easiest way to do that is with SIMD. Here's roughly how you'd do it with SSE2:
void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
__m128d scale = _mm_set_pd( 1.0 / 32768.0, 1.0 / 32768.0 );
int i = 0;
for ( ; i < uIntegers.size() - 3; i += 4)
{
__m128i x = _mm_loadu_si128(&uIntegers[i]);
__m128i y = _mm_shuffle_epi32(x, _MM_SHUFFLE(2,3,0,0) );
__m128d dx = _mm_cvtepi32_pd(x);
__m128d dy = _mm_cvtepi32_pd(y);
dx = _mm_mul_pd(dx, scale);
dy = _mm_mul_pd(dy, scale);
_mm_storeu_pd(dx, &uDoubles[i]);
_mm_storeu_pd(dy, &uDoubles[i + 2]);
}
// Finish off the last 0-3 elements the slow way
for ( ; i < uIntegers.size(); i ++)
{
uDoubles[i] = uIntegers[i] / 32768.0;
}
}
We process four integers per loop iteration. As we can only fit two doubles in the registers there's some duplicated work, but the extra unrolling will help performance unless the arrays are tiny.
Changing the data types to smaller ones (say short and float) should also help performance, because they cut down on memory bandwidth, and you can fit four floats in an SSE register. For audio data you shouldn't need the precision of a double.
Note that I've used unaligned loads and stores. Aligned ones will be slightly quicker if the data is actually aligned (which it won't be by default, and it's hard to make stuff aligned inside a std::vector).
Upvotes: 3
Reputation: 1549
Let me try another way:
If multiplying is seriously better from the perspective of assembly instructions, then this should guarantee that it will get multiplied.
void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
for ( int i = 0; i <=uIntegers.size()-1;i++)
{
uDoubles[i] = uIntegers[i] * 0.000030517578125;
}
}
Upvotes: 0
Reputation: 10770
If your array is large enough it may be worthwhile to parallelize this for loop. OpenMP's parallel for statement is what I would use.
The function would then be:
void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
#pragma omp parallel for
for (int i = 0; i < uIntegers.size(); i++)
{
uDoubles[i] = uIntegers[i] / 32768.0;
}
}
with gcc you need to pass -fopenmp
when you compile for the pragma
to be used, on MSVC it is /openmp
. Since spawning threads has a noticeable overhead, this will only be faster if you are processing large arrays, YMMV.
Upvotes: 3
Reputation: 13171
You might also try:
uDoubles[i] = ldexp((double)uIntegers[i], -15);
Upvotes: 1