tmighty
tmighty

Reputation: 11399

C++ conversion optimization

I would like to ask if there is a quicker way to do my audio conversion than by iterating through all values one by one and dividing them through 32768.

void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
    for ( int i = 0; i <=uIntegers.size()-1;i++)
    {
        uDoubles[i] = uIntegers[i] / 32768.0;
    }
}

My approach works fine, but it could be quicker. However I did not find any way to speed it up.

Thank you for the help!

Upvotes: 12

Views: 799

Answers (6)

user2088790
user2088790

Reputation:

Your function is highly parallelizable. On modern Intel CPU there are three independent ways to parallelize: Instruction level parallelism (ILP), thread level parallelism (TLP), and SIMD. I was able to use all three to get big boosts in your function. The results are compiler dependent though. The boost is much less using GCC since it already vectorizes the function. See the table of numbers below.

However, the main limiting factor in your function is that it's time complexity is only O(n) and so there is a drastic drop in efficiency when the size of the array you're running over crosses each cache level boundary. If you look at for example at large dense matrix multiplication (GEMM) it's a O(n^3) operation so if one does things right (using e.g. loop tiling) the cache hierarchy is not a problem: you can get close to the maximum flops/s even for very large matrices (which seems to indicate that GEMM is one of the thinks Intel thinks of when they design the CPU). The way to fix this in your case is to find a way to do your function on a L1 cache block right after/before you do a more complex operation (for example that goes as O(n^2)) and then move to another L1 block. Of course I don't know what you're doing so I can't do that.

ILP is partially done for you by the CPU hardware. However, often carried loop dependencies limit the ILP so it often helps to do loop unrolling to take full advantage of the ILP. For TLP I use OpenMP, and for SIMD I used AVX (however the code below works for SSE as well). I used 32 byte aligned memory and made sure the array was a multiple of 8 so that no clean up was necessary.

Here are the results from Visual Studio 2012 64bit with AVX and OpenMP (release mode obviously) SandyBridge EP 4 cores (8 HW threads) @3.6 GHz. The variable n is the number of items. I repeat the function several times as well so the total time includes that. The function convert_vec4_unroll2_openmp gives the best results except in the L1 region. You can also cleary see that the efficiency drops significantly each time you move to a new cache level but even for main memory it's still better.

l1 chache, n 2752, repeat 300000
    covert time 1.34, error 0.000000
    convert_vec4 time 0.16, error 0.000000
    convert_vec4_unroll2 time 0.16, error 0.000000
    convert_vec4_unroll2_openmp time 0.31, error 0.000000

l2 chache, n 21856, repeat 30000
    covert time 1.14, error 0.000000
    convert_vec4 time 0.24, error 0.000000
    convert_vec4_unroll2 time 0.24, error 0.000000
    convert_vec4_unroll2_openmp time 0.12, error 0.000000

l3 chache, n 699072, repeat 1000
    covert time 1.23, error 0.000000
    convert_vec4 time 0.44, error 0.000000
    convert_vec4_unroll2 time 0.45, error 0.000000
    convert_vec4_unroll2_openmp time 0.14, error 0.000000

main memory , n 8738144, repeat 100
    covert time 1.56, error 0.000000
    convert_vec4 time 0.95, error 0.000000
    convert_vec4_unroll2 time 0.89, error 0.000000
    convert_vec4_unroll2_openmp time 0.51, error 0.000000

Results with g++ foo.cpp -mavx -fopenmp -ffast-math -O3 on a i5-3317 (ivy bridge) @ 2.4 GHz 2 cores (4 HW threads). GCC seems to vectorize this and the only benefit comes from OpenMP (which, however, gives a worse result in the L1 region).

l1 chache, n 2752, repeat 300000
    covert time 0.26, error 0.000000
    convert_vec4 time 0.22, error 0.000000
    convert_vec4_unroll2 time 0.21, error 0.000000
    convert_vec4_unroll2_openmp time 0.46, error 0.000000

l2 chache, n 21856, repeat 30000
    covert time 0.28, error 0.000000
    convert_vec4 time 0.27, error 0.000000
    convert_vec4_unroll2 time 0.27, error 0.000000
    convert_vec4_unroll2_openmp time 0.20, error 0.000000

l3 chache, n 699072, repeat 1000
    covert time 0.80, error 0.000000
    convert_vec4 time 0.80, error 0.000000
    convert_vec4_unroll2 time 0.80, error 0.000000
    convert_vec4_unroll2_openmp time 0.83, error 0.000000

main memory chache, n 8738144, repeat 100
    covert time 1.10, error 0.000000
    convert_vec4 time 1.10, error 0.000000
    convert_vec4_unroll2 time 1.10, error 0.000000
    convert_vec4_unroll2_openmp time 1.00, error 0.000000

Here is the code. I use the vectorclass http://www.agner.org/optimize/vectorclass.zip to do SIMD. This will use either AVX to write 4 doubles at once or SSE to write 2 doubles at once.

#include <stdlib.h>
#include <stdio.h>
#include <omp.h>
#include "vectorclass.h"

void convert(const int *uIntegers, double *uDoubles, const int n) {
    for ( int i = 0; i<n; i++) {
        uDoubles[i] = uIntegers[i] / 32768.0;
    }
}

void convert_vec4(const int *uIntegers, double *uDoubles, const int n) {
    Vec4d div = 1.0/32768;
    for ( int i = 0; i<n; i+=4) {
        Vec4i u4i = Vec4i().load(&uIntegers[i]);
        Vec4d u4d  = to_double(u4i);
        u4d*=div;
        u4d.store(&uDoubles[i]);
    }
}

void convert_vec4_unroll2(const int *uIntegers, double *uDoubles, const int n) {
    Vec4d div = 1.0/32768;
    for ( int i = 0; i<n; i+=8) {
        Vec4i u4i_v1 = Vec4i().load(&uIntegers[i]);
        Vec4d u4d_v1  = to_double(u4i_v1);
        u4d_v1*=div;
        u4d_v1.store(&uDoubles[i]);

        Vec4i u4i_v2 = Vec4i().load(&uIntegers[i+4]);
        Vec4d u4d_v2  = to_double(u4i_v2);
        u4d_v2*=div;
        u4d_v2.store(&uDoubles[i+4]);
    }
}

void convert_vec4_openmp(const int *uIntegers, double *uDoubles, const int n) {
    #pragma omp parallel for    
    for ( int i = 0; i<n; i+=4) {
        Vec4i u4i = Vec4i().load(&uIntegers[i]);
        Vec4d u4d  = to_double(u4i);
        u4d/=32768.0;
        u4d.store(&uDoubles[i]);
    }
}

void convert_vec4_unroll2_openmp(const int *uIntegers, double *uDoubles, const int n) {
    Vec4d div = 1.0/32768;
    #pragma omp parallel for    
    for ( int i = 0; i<n; i+=8) {
        Vec4i u4i_v1 = Vec4i().load(&uIntegers[i]);
        Vec4d u4d_v1  = to_double(u4i_v1);
        u4d_v1*=div;
        u4d_v1.store(&uDoubles[i]);

        Vec4i u4i_v2 = Vec4i().load(&uIntegers[i+4]);
        Vec4d u4d_v2  = to_double(u4i_v2);
        u4d_v2*=div;
        u4d_v2.store(&uDoubles[i+4]);
    }
}

double compare(double *a, double *b, const int n) {
    double diff = 0.0;
    for(int i=0; i<n; i++) {
        double tmp = a[i] - b[i];
        //printf("%d %f %f \n", i, a[i], b[i]);
        if(tmp<0) tmp*=-1;
        diff += tmp;
    }
    return diff;
}

void loop(const int n, const int repeat, const int ifunc) {
    void (*fp[4])(const int *uIntegers, double *uDoubles, const int n);

    int *a = (int*)_mm_malloc(sizeof(int)* n, 32);
    double *b1_cmp = (double*)_mm_malloc(sizeof(double)*n, 32);
    double *b1 = (double*)_mm_malloc(sizeof(double)*n, 32);
    double dtime;

    const char *fp_str[] = {
        "covert",
        "convert_vec4",
        "convert_vec4_unroll2",
        "convert_vec4_unroll2_openmp",
    };

    for(int i=0; i<n; i++) {
        a[i] = rand()*RAND_MAX;
    }

    fp[0] = convert;
    fp[1] = convert_vec4;
    fp[2] = convert_vec4_unroll2;
    fp[3] = convert_vec4_unroll2_openmp;

    convert(a, b1_cmp, n);

    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) {
        fp[ifunc](a, b1, n);
    }
    dtime = omp_get_wtime() - dtime;
    printf("\t%s time %.2f, error %f\n", fp_str[ifunc], dtime, compare(b1_cmp,b1,n));

    _mm_free(a);
    _mm_free(b1_cmp);
    _mm_free(b1);
}

int main() {
    double dtime;
    int l1 = (32*1024)/(sizeof(int) + sizeof(double));
    int l2 = (256*1024)/(sizeof(int) + sizeof(double));
    int l3 = (8*1024*1024)/(sizeof(int) + sizeof(double));
    int lx = (100*1024*1024)/(sizeof(int) + sizeof(double));
    int n[] = {l1, l2, l3, lx};

    int repeat[] = {300000, 30000, 1000, 100};
    const char *cache_str[] = {"l1", "l2", "l3", "main memory"};
    for(int c=0; c<4; c++ ) {
        int lda = ((n[c]+7) & -8); //make sure array is a multiple of 8
        printf("%s chache, n %d\n", cache_str[c], lda);
        for(int i=0; i<4; i++) {
            loop(lda, repeat[c], i);
        } printf("\n");
    }
}

Lastly, anyone who has read this far and feels like reminding me that my code looks more like C than C++ please read this first before you decide to comment http://www.stroustrup.com/sibling_rivalry.pdf

Upvotes: 3

FrankH.
FrankH.

Reputation: 18227

Edit: See Adam's answer above for a version using SSE intrinsics. Better than what I had here ...

To make this more useful, let's look at compiler-generated code here. I'm using gcc 4.8.0 and yes, it is worth checking your specific compiler (version) as there are quite significant differences in output for, say, gcc 4.4, 4.8, clang 3.2 or Intel's icc.

Your original, using g++ -O8 -msse4.2 ... translates into the following loop:

.L2:
    cvtsi2sd        (%rcx,%rax,4), %xmm0
    mulsd   %xmm1, %xmm0
    addl    $1, %edx
    movsd   %xmm0, (%rsi,%rax,8)
    movslq  %edx, %rax
    cmpq    %rdi, %rax
    jbe     .L2

where %xmm1 holds 1.0/32768.0 so the compiler automatically turns the division into multiplication-by-reverse.

On the other hand, using g++ -msse4.2 -O8 -funroll-loops ..., the code created for the loop changes significantly:

[ ... ]
    leaq    -1(%rax), %rdi
    movq    %rdi, %r8
    andl    $7, %r8d
    je      .L3
[ ... insert a duff's device here, up to 6 * 2 conversions ... ]
    jmp     .L3
    .p2align 4,,10
    .p2align 3
.L39:
    leaq    2(%rsi), %r11
    cvtsi2sd        (%rdx,%r10,4), %xmm9
    mulsd   %xmm0, %xmm9
    leaq    5(%rsi), %r9
    leaq    3(%rsi), %rax
    leaq    4(%rsi), %r8
    cvtsi2sd        (%rdx,%r11,4), %xmm10
    mulsd   %xmm0, %xmm10
    cvtsi2sd        (%rdx,%rax,4), %xmm11
    cvtsi2sd        (%rdx,%r8,4), %xmm12
    cvtsi2sd        (%rdx,%r9,4), %xmm13
    movsd   %xmm9, (%rcx,%r10,8)
    leaq    6(%rsi), %r10
    mulsd   %xmm0, %xmm11
    mulsd   %xmm0, %xmm12
    movsd   %xmm10, (%rcx,%r11,8)
    leaq    7(%rsi), %r11
    mulsd   %xmm0, %xmm13
    cvtsi2sd        (%rdx,%r10,4), %xmm14
    mulsd   %xmm0, %xmm14
    cvtsi2sd        (%rdx,%r11,4), %xmm15
    mulsd   %xmm0, %xmm15
    movsd   %xmm11, (%rcx,%rax,8)
    movsd   %xmm12, (%rcx,%r8,8)
    movsd   %xmm13, (%rcx,%r9,8)
    leaq    8(%rsi), %r9
    movsd   %xmm14, (%rcx,%r10,8)
    movsd   %xmm15, (%rcx,%r11,8)
    movq    %r9, %rsi
.L3:
    cvtsi2sd        (%rdx,%r9,4), %xmm8
    mulsd   %xmm0, %xmm8
    leaq    1(%rsi), %r10
    cmpq    %rdi, %r10
    movsd   %xmm8, (%rcx,%r9,8)
    jbe     .L39
[ ... out ... ]

So it blocks the operations up, but still converts one-value-at-a-time.

If you change your original loop to operate on a few elements per iteration:

size_t i;
for (i = 0; i < uIntegers.size() - 3; i += 4)
{
    uDoubles[i] = uIntegers[i] / 32768.0;
    uDoubles[i+1] = uIntegers[i+1] / 32768.0;
    uDoubles[i+2] = uIntegers[i+2] / 32768.0;
    uDoubles[i+3] = uIntegers[i+3] / 32768.0;
}
for (; i < uIntegers.size(); i++)
    uDoubles[i] = uIntegers[i] / 32768.0;

the compiler, gcc -msse4.2 -O8 ... (i.e. even without requesting unrolling), identifies the potential to use CVTDQ2PD/MULPD and the core of the loop becomes:

    .p2align 4,,10
    .p2align 3
.L4:
    movdqu  (%rcx), %xmm0
    addq    $16, %rcx
    cvtdq2pd        %xmm0, %xmm1
    pshufd  $238, %xmm0, %xmm0
    mulpd   %xmm2, %xmm1
    cvtdq2pd        %xmm0, %xmm0
    mulpd   %xmm2, %xmm0
    movlpd  %xmm1, (%rdx,%rax,8)
    movhpd  %xmm1, 8(%rdx,%rax,8)
    movlpd  %xmm0, 16(%rdx,%rax,8)
    movhpd  %xmm0, 24(%rdx,%rax,8)
    addq    $4, %rax
    cmpq    %r8, %rax
    jb      .L4
    cmpq    %rdi, %rax
    jae     .L29
[ ... duff's device style for the "tail" ... ]
.L29:
    rep ret

I.e. now the compiler recognizes the opportunity to put two double per SSE register, and do parallel multiply / conversion. This is pretty close to the code that Adam's SSE intrinsics version would generate.

The code in total (I've shown only about 1/6th of it) is much more complex than the "direct" intrinsics, due to the fact that, as mentioned, the compiler tries to prepend/append unaligned / not-block-multiple "heads" and "tails" to the loop. It largely depends on the average/expected sizes of your vectors whether this will be beneficial or not; for the "generic" case (vectors more than twice the size of the block processed by the "innermost" loop), it'll help.

The result of this exercise is, largely ... that, if you coerce (by compiler options/optimization) or hint (by slightly rearranging the code) your compiler to do the right thing, then for this specific kind of copy/convert loop, it comes up with code that's not going to be much behind hand-written intrinsics.

Final experiment ... make the code:

static double c(int x) { return x / 32768.0; }
void Convert(const std::vector<int>& uIntegers, std::vector<double>& uDoubles)
{
    std::transform(uIntegers.begin(), uIntegers.end(), uDoubles.begin(), c);
}

and (for the nicest-to-read assembly output, this time using gcc 4.4 with gcc -O8 -msse4.2 ...) the generated assembly core loop (again, there's a pre/post bit) becomes:

    .p2align 4,,10
    .p2align 3
.L8:
    movdqu  (%r9,%rax), %xmm0
    addq    $1, %rcx
    cvtdq2pd        %xmm0, %xmm1
    pshufd  $238, %xmm0, %xmm0
    mulpd   %xmm2, %xmm1
    cvtdq2pd        %xmm0, %xmm0
    mulpd   %xmm2, %xmm0
    movapd  %xmm1, (%rsi,%rax,2)
    movapd  %xmm0, 16(%rsi,%rax,2)
    addq    $16, %rax
    cmpq    %rcx, %rdi
    ja      .L8
    cmpq    %rbx, %rbp
    leaq    (%r11,%rbx,4), %r11
    leaq    (%rdx,%rbx,8), %rdx
    je      .L10
[ ... ]
.L10:
[ ... ]
    ret

With that, what do we learn ? If you want to use C++, really use C++ ;-)

Upvotes: 1

Adam
Adam

Reputation: 952

For maximum speed you want to convert more than one value per loop iteration. The easiest way to do that is with SIMD. Here's roughly how you'd do it with SSE2:

void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
    __m128d scale = _mm_set_pd( 1.0 / 32768.0, 1.0 / 32768.0 );
    int i = 0;
    for ( ; i < uIntegers.size() - 3; i += 4)
    {
        __m128i x = _mm_loadu_si128(&uIntegers[i]);
        __m128i y = _mm_shuffle_epi32(x, _MM_SHUFFLE(2,3,0,0) );
        __m128d dx = _mm_cvtepi32_pd(x);
        __m128d dy = _mm_cvtepi32_pd(y);
        dx = _mm_mul_pd(dx, scale);
        dy = _mm_mul_pd(dy, scale);
        _mm_storeu_pd(dx, &uDoubles[i]);
        _mm_storeu_pd(dy, &uDoubles[i + 2]);
    }
    // Finish off the last 0-3 elements the slow way
    for ( ; i < uIntegers.size(); i ++)
    {
        uDoubles[i] = uIntegers[i] / 32768.0;
    }
}

We process four integers per loop iteration. As we can only fit two doubles in the registers there's some duplicated work, but the extra unrolling will help performance unless the arrays are tiny.

Changing the data types to smaller ones (say short and float) should also help performance, because they cut down on memory bandwidth, and you can fit four floats in an SSE register. For audio data you shouldn't need the precision of a double.

Note that I've used unaligned loads and stores. Aligned ones will be slightly quicker if the data is actually aligned (which it won't be by default, and it's hard to make stuff aligned inside a std::vector).

Upvotes: 3

ruben2020
ruben2020

Reputation: 1549

Let me try another way:

If multiplying is seriously better from the perspective of assembly instructions, then this should guarantee that it will get multiplied.

void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
    for ( int i = 0; i <=uIntegers.size()-1;i++)
    {
        uDoubles[i] = uIntegers[i] * 0.000030517578125;
    }
}

Upvotes: 0

SirGuy
SirGuy

Reputation: 10770

If your array is large enough it may be worthwhile to parallelize this for loop. OpenMP's parallel for statement is what I would use.
The function would then be:

    void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
    {
        #pragma omp parallel for
        for (int i = 0; i < uIntegers.size(); i++)
        {
            uDoubles[i] = uIntegers[i] / 32768.0;
        }
    }

with gcc you need to pass -fopenmp when you compile for the pragma to be used, on MSVC it is /openmp. Since spawning threads has a noticeable overhead, this will only be faster if you are processing large arrays, YMMV.

Upvotes: 3

Lee Daniel Crocker
Lee Daniel Crocker

Reputation: 13171

You might also try:

uDoubles[i] = ldexp((double)uIntegers[i], -15);

Upvotes: 1

Related Questions