kakrafoon
kakrafoon

Reputation: 506

C++ STL algorithm (list sort) OpenMP/multithreaded implementations

I was attempting to accelerate one of the kernels of my code which at its essence boils down to a sort (I am running this on a CPU with several cores). I discovered from this post (STL algorithms and concurrent programming) that some of these algorithms can be accelerated, for example, using OpenMP (see below).

I got quite decent speedup using __gnu_parallel::sort

E.g.

__gnu_parallel::sort(std::begin(X), std::end(X), [](X a, X b){ return a.member > b.member;});

It turns out that std::list is a much better container for my data. But this does not seem to have a parallel/multithreaded implementation for sort.

The post linked above is was dated 2010. I was wondering what the most current wisdom is on this.

Upvotes: 1

Views: 1489

Answers (2)

Serg Kryvonos
Serg Kryvonos

Reputation: 4677

Since C++ 17 you may consider parallel sort algorithms std::sort(std::execution::par, ...

Upvotes: 1

rcgldr
rcgldr

Reputation: 28826

In the case of Microsoft (prior to Visual Studio 2015), std::list::sort uses an array of lists, where array[i] is either an empty list or a list of size 2 to the power i (1,2,4,8, ... ). Nodes are taken from the original list one at a time and merged into the array, then the array is merged to form a single sorted list. Assuming compare overhead isn't excessive, this is a memory bound process, and multi-threading won't help due to the overhead of scanning a list to split up the list, nearly doubling the number of memory read operations. Here is example code for this type of list sort with the restriction that the compare is < not <=, so the operands have to be reversed to keep the sort stable.

#define NUMLISTS 32                     /* number of lists */
NODE * SortList(NODE *pList)
{
NODE * aList[NUMLISTS];                 /* array of lists */
NODE * pNode;
NODE * pNext;
int i;
    if(pList == NULL)                   /* check for empty list */
        return NULL;
    for(i = 0; i < NUMLISTS; i++)       /* zero array */
        aList[i] = NULL;
    pNode = pList;                      /* merge nodes into array */
    while(pNode != NULL){
        pNext = pNode->next;
        pNode->next = NULL;
        for(i = 0; (i < NUMLISTS) && (aList[i] != NULL); i++){
            pNode = MergeLists(aList[i], pNode);
            aList[i] = NULL;
        }
        if(i == NUMLISTS)
            i--;
        aList[i] = pNode;
        pNode = pNext;
    }
    pNode = NULL;                       /* merge array into one list */
    for(i = 0; i < NUMLISTS; i++)
        pNode = MergeLists(aList[i], pNode);
    return pNode;
}

NODE * MergeLists(NODE *pSrc1, NODE *pSrc2)
{
NODE *pDst = NULL;                      /* destination head ptr */
NODE **ppDst = &pDst;                   /* ptr to head or prev->next */
    if(pSrc1 == NULL)
        return pSrc2;
    if(pSrc2 == NULL)
        return pSrc1;
    while(1){
        if(pSrc2->data < pSrc1->data){  /* if src2 < src1 */
            *ppDst = pSrc2;
            pSrc2 = *(ppDst = &pSrc2->next);
            if(pSrc2 == NULL){
                *ppDst = pSrc1;
                break;
            }
        } else {                        /* src1 <= src2 */
            *ppDst = pSrc1;
            pSrc1 = *(ppDst = &pSrc1->next);
            if(pSrc1 == NULL){
                *ppDst = pSrc2;
                break;
            }
        }
    }
    return pDst;
}

update - Visual Studio 2015 and later switched to using iterators instead of lists for the merge sort, which eliminates allocation issues like no default allocator, and since the merging is done via splice() all on the same list, it provides exception safety (if a user compare throws an exception, the list is reordered, but all the nodes are there, assuming splice never throws an exception). VS2015 also switched to a top down merge sort, but a bottom up merge sort based on iterators could have been used. I'm not sure why the switch to top down was done, as it is about 40% slower for a large list (well beyond cache size) with randomly scattered nodes. Example iterator based code. Each iterator in the array points to the first node of a run of size 2 to the power i, or it is equal to list.end(), to indicate an empty run. The end of a run will be the first prior non "empty" entry in the array or a local variable iterator (all runs in the array are adjacent runs). All merges involve adjacent runs. The merge function takes 3 parameters, an iterator to the first node of the left run, a iterator to the first node of the right run, which is also the end of the left run, and an iterator to the end of the right run (which may be an iterator to the first node of the following run or list.end()).

template <typename T>
typename std::list<T>::iterator Merge(std::list<T> &ll,
                    typename std::list<T>::iterator li,
                    typename std::list<T>::iterator ri,
                    typename std::list<T>::iterator ei);

// iterator array size
#define ASZ 32

template <typename T>
void SortList(std::list<T> &ll)
{
    if (ll.size() < 2)                  // return if nothing to do
        return;
    std::list<T>::iterator ai[ASZ];     // array of iterators
    std::list<T>::iterator li;          // left   iterator
    std::list<T>::iterator ri;          // right  iterator
    std::list<T>::iterator ei;          // end    iterator
    size_t i;
    for (i = 0; i < ASZ; i++)           // "empty" array
        ai[i] = ll.end();
    // merge nodes into array
    for (ei = ll.begin(); ei != ll.end();) {
        ri = ei++;
        for (i = 0; (i < ASZ) && ai[i] != ll.end(); i++) {
            ri = Merge(ll, ai[i], ri, ei);
            ai[i] = ll.end();
        }
        if(i == ASZ)
            i--;
        ai[i] = ri;
    }
    // merge array into single list
    ei = ll.end();                              
    for(i = 0; (i < ASZ) && ai[i] == ei; i++);
    ri = ai[i++];
    while(1){
        for( ; (i < ASZ) && ai[i] == ei; i++);
        if (i == ASZ)
            break;
        li = ai[i++];
        ri = Merge(ll, li, ri, ei);
    }
}

template <typename T>
typename std::list<T>::iterator Merge(std::list<T> &ll,
                             typename std::list<T>::iterator li,
                             typename std::list<T>::iterator ri,
                             typename std::list<T>::iterator ei)
{
    std::list<T>::iterator ni;
    (*ri < *li) ? ni = ri : ni = li;
    while(1){
        if(*ri < *li){
            ll.splice(li, ll, ri++);
            if(ri == ei)
                return ni;
        } else {
            if(++li == ri)
                return ni;
        }
    }
}

Upvotes: 1

Related Questions