Reputation: 535
My first version two thread selection sort starts a new thread for every iteration.
Some comparison(left - one thread, right - two thread version, time - ms):
Size: 500 time 1 time 5
Size: 1000 time 1 time 9
Size: 1500 time 4 time 12
Size: 2000 time 5 time 16
Size: 2500 time 10 time 22
Size: 3000 time 14 time 26
Size: 3500 time 19 time 30
Size: 4000 time 24 time 36
Size: 4500 time 30 time 43
Size: 5000 time 37 time 49
Size: 5500 time 46 time 57
Size: 6000 time 55 time 66
Size: 6500 time 63 time 76
Size: 7000 time 74 time 80
Size: 7500 time 85 time 92
Size: 8000 time 96 time 102
Size: 8500 time 108 time 109
Size: 9000 time 122 time 124
Size: 9500 time 135 time 132
Size: 10000 time 150 time 144
Size: 10500 time 165 time 156
Size: 11000 time 181 time 174
Size: 11500 time 200 time 177
Size: 12000 time 218 time 195
Size: 12500 time 235 time 205
Size: 13000 time 255 time 214
Size: 13500 time 273 time 226
Size: 14000 time 296 time 245
After 9500 size array two threads work faster. In my second implementation a thread start once. But it has so unbelievable performance.
My cpu has 4 cores.
Size: 0 time 0 time 0
Size: 50 time 0 time 151
Size: 100 time 0 time 1276
Size: 150 time 0 time 2089
Size: 200 time 0 time 3925
Size: 250 time 0 time 5303
Code:
//one thread
template<class ItType>
void selectionSortThreadsHelper2(ItType beg, ItType end)
{
//sorting element by element
for (auto it = beg; it != end; ++it) {
ItType middleIt = it + std::distance(it, end) / 2;
auto search = [&] { return std::min_element(it, middleIt); };
//search
std::future<ItType> minFirstHalfResult(std::async(std::launch::async , search));
//wait searching
ItType minSecondHalfIt = std::min_element(middleIt, end);
ItType minFirstHalfIt = minFirstHalfResult.get();
//swap if
ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt;
if (minIt != it)
std::iter_swap(minIt, it);
}
}
//two thread
template<class ItType>
void selectionSortThreadsHelper3(ItType beg, ItType end)
{
bool quit = false;
bool readyFlag = false;
bool processed = false;
std::mutex readyMutex;
std::condition_variable readyCondVar;
ItType it;
ItType middleIt;
ItType minFirstHalfIt;
auto search = [&]() {
while (true) {
std::unique_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, [&] {return readyFlag; });
if (quit)
return;
minFirstHalfIt = std::min_element(it, middleIt);
processed = true;
ul.unlock();
readyCondVar.notify_one();
}
};
std::future<void> f(std::async(std::launch::async, search));
//sorting element by element
for (it = beg; it != end; ++it) {
middleIt = it + std::distance(it, end) / 2;
//say second thread to start searching
{
std::lock_guard<std::mutex> lg(readyMutex);
readyFlag = true;
}
readyCondVar.notify_one();
//std::this_thread::yield();
ItType minSecondHalfIt = std::min_element(middleIt, end);
//wait second thread
{
std::unique_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, [&] { return processed; });
processed = false;
readyFlag = false;
}
readyCondVar.notify_all();
//swap if
ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt;
if (minIt != it)
std::iter_swap(minIt, it);
}
//quit thread
{
std::lock_guard<std::mutex> lg(readyMutex);
readyFlag = true;
quit = true;
}
readyCondVar.notify_all();
f.get();
}
Upvotes: 0
Views: 3887
Reputation: 28826
Example multi-threaded bottom up merge sort, using Windows threading interface, in this case 4 threads meant for a processor with 4 (or more) cores. Depending on the size of the array, it's about 3 times as fast as a single threaded merge sort, mostly due to the operations that occur within each core's local L1 and L2 cache. The semaphores are used to start all threads at the same time for benchmarking purposes. On my system (Intel 2600K 3.4ghz), it takes about 0.5 seconds to sort 16 million 32 bit integers, versus about 1.5 seconds for a single threaded merge sort.
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <windows.h>
#define SIZE (16*1024*1024) // must be multiple of 4
static HANDLE hs0; // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1; // thread handles
static HANDLE ht2;
static HANDLE ht3;
static DWORD WINAPI Thread0(LPVOID); // thread functions
static DWORD WINAPI Thread1(LPVOID);
static DWORD WINAPI Thread2(LPVOID);
static DWORD WINAPI Thread3(LPVOID);
static int *pa; // pointers to buffers
static int *pb;
void BottomUpMergeSort(int a[], int b[], size_t n);
void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee);
void BottomUpCopy(int a[], int b[], size_t ll, size_t rr);
size_t GetPassCount(size_t n);
int main()
{
int *array = new int[SIZE];
int *buffer = new int[SIZE];
clock_t ctTimeStart; // clock values
clock_t ctTimeStop;
pa = array;
pb = buffer;
for(int i = 0; i < SIZE; i++){ // generate pseudo random data
int r;
r = (((int)((rand()>>4) & 0xff))<< 0);
r += (((int)((rand()>>4) & 0xff))<< 8);
r += (((int)((rand()>>4) & 0xff))<<16);
r += (((int)((rand()>>4) & 0x7f))<<24);
array[i] = r;
}
hs0 = CreateSemaphore(NULL,0,1,NULL);
hs1 = CreateSemaphore(NULL,0,1,NULL);
hs2 = CreateSemaphore(NULL,0,1,NULL);
hs3 = CreateSemaphore(NULL,0,1,NULL);
ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);
ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
ctTimeStart = clock();
ReleaseSemaphore(hs0, 1, NULL); // start sorts
ReleaseSemaphore(hs1, 1, NULL);
ReleaseSemaphore(hs2, 1, NULL);
ReleaseSemaphore(hs3, 1, NULL);
Thread0((LPVOID)NULL);
WaitForSingleObject(ht2, INFINITE);
// merge 1st and 2nd halves
BottomUpMerge(pb, pa, 0, SIZE>>1, SIZE);
ctTimeStop = clock();
std::cout << "Number of ticks " << (ctTimeStop - ctTimeStart) << std::endl;
for(int i = 1; i < SIZE; i++){ // check result
if(array[i-1] > array[i]){
std::cout << "failed" << std::endl;
}
}
CloseHandle(ht3);
CloseHandle(ht2);
CloseHandle(ht1);
CloseHandle(hs3);
CloseHandle(hs2);
CloseHandle(hs1);
CloseHandle(hs0);
delete[] buffer;
delete[] array;
return 0;
}
static DWORD WINAPI Thread0(LPVOID lpvoid)
{
WaitForSingleObject(hs0, INFINITE); // wait for semaphore
// sort 1st quarter
BottomUpMergeSort(pa + 0*(SIZE>>2), pb + 0*(SIZE>>2), SIZE>>2);
WaitForSingleObject(ht1, INFINITE); // wait for thead 1
// merge 1st and 2nd quarter
BottomUpMerge(pa + 0*(SIZE>>1), pb + 0*(SIZE>>1), 0, SIZE>>2, SIZE>>1);
return 0;
}
static DWORD WINAPI Thread1(LPVOID lpvoid)
{
WaitForSingleObject(hs1, INFINITE); // wait for semaphore
// sort 2nd quarter
BottomUpMergeSort(pa + 1*(SIZE>>2), pb + 1*(SIZE>>2), SIZE>>2);
return 0;
}
static DWORD WINAPI Thread2(LPVOID lpvoid)
{
WaitForSingleObject(hs2, INFINITE); // wait for semaphore
// sort 3rd quarter
BottomUpMergeSort(pa + 2*(SIZE>>2), pb + 2*(SIZE>>2), SIZE>>2);
WaitForSingleObject(ht3, INFINITE); // wait for thread 3
// merge 3rd and 4th quarter
BottomUpMerge(pa + 1*(SIZE>>1), pb + 1*(SIZE>>1), 0, SIZE>>2, SIZE>>1);
return 0;
}
static DWORD WINAPI Thread3(LPVOID lpvoid)
{
WaitForSingleObject(hs3, INFINITE); // wait for semaphore
// sort 4th quarter
BottomUpMergeSort(pa + 3*(SIZE>>2), pb + 3*(SIZE>>2), SIZE>>2);
return 0;
}
void BottomUpMergeSort(int a[], int b[], size_t n)
{
size_t s = 1; // run size
if(GetPassCount(n) & 1){ // if odd number of passes
for(s = 1; s < n; s += 2) // swap in place for 1st pass
if(a[s] < a[s-1])
std::swap(a[s], a[s-1]);
s = 2;
}
while(s < n){ // while not done
size_t ee = 0; // reset end index
while(ee < n){ // merge pairs of runs
size_t ll = ee; // ll = start of left run
size_t rr = ll+s; // rr = start of right run
if(rr >= n){ // if only left run
rr = n;
BottomUpCopy(a, b, ll, rr); // copy left run
break; // end of pass
}
ee = rr+s; // ee = end of right run
if(ee > n)
ee = n;
BottomUpMerge(a, b, ll, rr, ee);
}
std::swap(a, b); // swap a and b
s <<= 1; // double the run size
}
}
void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee)
{
size_t o = ll; // b[] index
size_t l = ll; // a[] left index
size_t r = rr; // a[] right index
while(1){ // merge data
if(a[l] <= a[r]){ // if a[l] <= a[r]
b[o++] = a[l++]; // copy a[l]
if(l < rr) // if not end of left run
continue; // continue (back to while)
do // else copy rest of right run
b[o++] = a[r++];
while(r < ee);
break; // and return
} else { // else a[l] > a[r]
b[o++] = a[r++]; // copy a[r]
if(r < ee) // if not end of right run
continue; // continue (back to while)
do // else copy rest of left run
b[o++] = a[l++];
while(l < rr);
break; // and return
}
}
}
void BottomUpCopy(int a[], int b[], size_t ll, size_t rr)
{
do // copy left run
b[ll] = a[ll];
while(++ll < rr);
}
size_t GetPassCount(size_t n) // return # passes
{
size_t i = 0;
for(size_t s = 1; s < n; s <<= 1)
i += 1;
return(i);
}
Upvotes: 2
Reputation: 602
There are many ways to get parallel sort without writing your own. First, there is an experimental parallelism namespace that lets you say sort(par, data.begin(), data.end())
: http://en.cppreference.com/w/cpp/experimental/parallelism/existing#sort
That namespace is being merged into the standard in C++17, so it should be in the std::
namespace at some point (https://parallelstl.codeplex.com/). There is also an older nonstandard GNU g++ parallel sort implementation based on OpenMP: https://gcc.gnu.org/onlinedocs/libstdc++/manual/parallel_mode.html
Finally, there are many pages online describing how to write your own parallel sort in C++11. Try searching. Here is a very comprehensive page: https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp
Upvotes: 2