Desmador
Desmador

Reputation: 13

Threaded radixsort not faster

I´m having trouble understanding why my implementation is not faster than on a single thread.

I´m trying to implement a bitwise MSB-radixsort(which is working for now) with a certain number (n) of threads. In my approach I create a new thread up until log2(n) within the recursion. But this seems to just increase the time cause of the creation of the thread. I`m guessing that the created threads can not work at the same time on the array, even with the splitted responsibility within it.

If someone could explain it I would really appreciate it. Thanks!!

UPDATE ::

As it turns out I was missing the most obvious Solution as Ulrich commented. I was using a VM where it was locked to one CPU, which I did not know it did. After changing it I got the desired speedup for the radixsort.

Thank you all for the comments and general info on that topic. ^^

Upvotes: 1

Views: 400

Answers (1)

rcgldr
rcgldr

Reputation: 28826

The maximum number of threads should be limited to the number of cores on your system, or two times the number of cores if each core has hyper-threading. You've since updated your question to note that you only had one core using a virtual machine, and the multi-threaded version is faster with multiple cores.

Other potential issues: if the distribution of the data is not uniform, then splitting into bins will result in uneven sized bins. Using a base larger than 2, such as 256, will reduce this effect somewhat. On the initial pass, there can be cache line conflicts, where each write to a cache line on one core will invalidate all instances of that cache line on other cores.

Example code that uses a single thread to do a MSD first radix sort base 256, to create 256 bins, which are then sorted using LSD first radix sort, using 4 threads. This example uses Windows specific thread API. This relies on having reasonably uniform data so that the 256 bins are somewhat equal in size. I didn't try to use multi-threading for the initial MSD pass used to create the 256 bins, as one potential issue is cache line conflicts on the array of index updates, which would need an atomic increment operation, which may limit the performance benefit.

#include <iostream>
#include <windows.h>

#define COUNT (6000*6000)            // number of values to sort

#define QP 1        // if != 0, use queryperformance for timer

#if QP
#include <math.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#else
#include <ctime>
#endif

#if QP
static LI64     liQPFrequency;  // cpu counter values
static LI64     liStartTime;
static LI64     liStopTime;
static double   dQPFrequency;
static double   dStartTime;
static double   dStopTime;
static double   dElapsedTime;
#else
clock_t ctTimeStart;            // clock values
clock_t ctTimeStop;
#endif

typedef unsigned       int uint32_t;
typedef unsigned long long uint64_t;

static HANDLE hs0;                      // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;

static HANDLE ht1;                      // thread handles
static HANDLE ht2;
static HANDLE ht3;

static uint64_t *pa, *pb;               // ptrs to arrays
static uint32_t aIndex[260];            // start + end indexes, 256 bins
static uint32_t aIi;                    // current index for 4 bins

static DWORD WINAPI Thread0(LPVOID lpvoid);

void RadixSort(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t i,m,n;
uint64_t u;
    for(i = 0; i < count; i++)          // generate histogram
        aIndex[(size_t)(a[i] >> 56)]++;
    m = 0;                              // convert to indexes
    for(i = 0; i < 257; i++){           //  including end of bin[255]
        n = aIndex[i];
        aIndex[i] = m;
        m += n;
    }
    for(i = 0; i < count; i++){         // sort by msb
        u = a[i];
        m = (uint32_t)(u >> 56);
        b[aIndex[m]++] = u;
    }
    for(i = 256; i; i--)                // restore aIndex
        aIndex[i] = aIndex[i-1];
    aIndex[0] = 0;
    for(aIi = 0; aIi < 256; aIi += 4){
        ReleaseSemaphore(hs1, 1, NULL);     // start threads
        ReleaseSemaphore(hs2, 1, NULL);
        ReleaseSemaphore(hs3, 1, NULL);
        Thread0(NULL);
        WaitForSingleObject(hs0, INFINITE); // wait for threads done
        WaitForSingleObject(hs0, INFINITE);
        WaitForSingleObject(hs0, INFINITE);
    }        
}

void RadixSort7(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t mIndex[7][256] = {0};          // count / index matrix
uint32_t i,j,m,n;
uint64_t u;
    for(i = 0; i < count; i++){         // generate histograms
        u = a[i];
        for(j = 0; j < 7; j++){
            mIndex[j][(size_t)(u & 0xff)]++;
            u >>= 8;
        }
    }
    for(j = 0; j < 7; j++){             // convert to indices
        m = 0;
        for(i = 0; i < 256; i++){
            n = mIndex[j][i];
            mIndex[j][i] = m;
            m += n;
        }       
    }
    for(j = 0; j < 7; j++){             // radix sort
        for(i = 0; i < count; i++){     //  sort by current lsb
            u = a[i];
            m = (size_t)(u>>(j<<3))&0xff;
            b[mIndex[j][m]++] = u;
        }
        std::swap(a, b);                //  swap ptrs
    }
}

static DWORD WINAPI Thread0(LPVOID lpvoid)
{
    RadixSort7(pb + aIndex[aIi+0], pa + aIndex[aIi+0], aIndex[aIi+1]-aIndex[aIi+0]);
    return 0;
}

static DWORD WINAPI Thread1(LPVOID lpvoid)
{
    while(1){
        WaitForSingleObject(hs1, INFINITE); // wait for semaphore
        RadixSort7(pb + aIndex[aIi+1], pa + aIndex[aIi+1], aIndex[aIi+2]-aIndex[aIi+1]);
        ReleaseSemaphore(hs0, 1, NULL);     // indicate done
        if(aIi == 252)                      // exit if all bins done
            return 0;
    }
}

static DWORD WINAPI Thread2(LPVOID lpvoid)
{
    while(1){
        WaitForSingleObject(hs2, INFINITE); // wait for semaphore
        RadixSort7(pb + aIndex[aIi+2], pa + aIndex[aIi+2], aIndex[aIi+3]-aIndex[aIi+2]);
        ReleaseSemaphore(hs0, 1, NULL);     // indicate done
        if(aIi == 252)                      // exit if all bins done
            return 0;
    }
}

static DWORD WINAPI Thread3(LPVOID lpvoid)
{
    while(1){
        WaitForSingleObject(hs3, INFINITE); // wait for semaphore
        RadixSort7(pb + aIndex[aIi+3], pa + aIndex[aIi+3], aIndex[aIi+4]-aIndex[aIi+3]);
        ReleaseSemaphore(hs0, 1, NULL);     // indicate done
        if(aIi == 252)                      // exit if all bins done
            return 0;
    }
}

uint64_t rnd64()                        // random 64 bit integer
{
static uint64_t r = 1ull;
    r = r * 6364136223846793005ull + 1442695040888963407ull;
    return r;
}

int main( )
{
    uint64_t * a = new uint64_t [COUNT];    // allocate data array
    uint64_t * b = new uint64_t [COUNT];    // allocate temp array
    size_t i;
    pa = a;                                 // set global ptrs
    pb = b;

    hs0 = CreateSemaphore(NULL,0,7,NULL);   // create semaphores
    hs1 = CreateSemaphore(NULL,0,1,NULL);
    hs2 = CreateSemaphore(NULL,0,1,NULL);
    hs3 = CreateSemaphore(NULL,0,1,NULL);

    ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);  // create threads
    ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
    ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);

    for(i = 0; i < COUNT; i++){             // generate data
        a[i] = rnd64();
    }

#if QP
    QueryPerformanceFrequency(&liQPFrequency);
    dQPFrequency = (double)liQPFrequency.QuadPart;
    timeBeginPeriod(1);                     // set ticker to 1000 hz
    Sleep(128);                             // wait for it to settle
    QueryPerformanceCounter(&liStartTime);
#else
    ctTimeStart = clock();
#endif

    RadixSort(a, b, COUNT);

#if QP
    QueryPerformanceCounter(&liStopTime);
    dStartTime = (double)liStartTime.QuadPart;
    dStopTime  = (double)liStopTime.QuadPart;
    dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
    timeEndPeriod(1);                       // restore ticker to default
    std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
    ctTimeStop = clock();
    std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif

    WaitForSingleObject(ht1, INFINITE); // wait for threads
    WaitForSingleObject(ht2, INFINITE);
    WaitForSingleObject(ht3, INFINITE);
    CloseHandle(ht1);                   // close threads
    CloseHandle(ht2);
    CloseHandle(ht3);
    CloseHandle(hs0);                   // close semaphores
    CloseHandle(hs1);
    CloseHandle(hs2);
    CloseHandle(hs3);
    
    for(i = 1; i < COUNT; i++){
        if(a[i] < a[i-1]){
            break;}}
    if(i == COUNT)
        std::cout << "passed" << std::endl;
    else
        std::cout << "failed" << std::endl;

    delete[] b;
    delete[] a;
    return 0;
}

Upvotes: 2

Related Questions