edrezen
edrezen

Reputation: 1628

Fast algorithm for computing intersections of N sets

I have n sets A0,A2,...An-1 holding items of a set E.

I define a configuration C as the integer made of n bits, so C has values between 0 and 2^n-1. Now, I define the following:

(C)   an item e of E is in configuration C 
       <=> for each bit b of C, if b==1 then e is in Ab, else e is not in Ab

For instance for n=3, the configuration C=011 corresponds to items of E that are in A0 and A1 but NOT in A2 (the NOT is important)

C[bitmap] is the count of elements that have exactly that presence/absence pattern in the sets. C[001] is the number of elements in A0 that aren't also in any other sets.


Another possible definition is :

(V)   an item e of E is in configuration V 
       <=> for each bit b of V, if b==1 then e is in Ab

For instance for n=3, the (V) configuration V=011 corresponds to items of E that are in A0 and A1

V[bitmap] is the count of the intersection of the selected sets. (i.e. the count of how many elements are in all of the sets where the bitmap is true.) V[001] is the number of elements in A0. V[011] is the number of elements in A0 and A1, regardless of whether or not they're also in A2.


In the following, the first picture shows items of sets A0, A1 and A2, the second picture shows size of (C) configurations and the third picture shows size of (V) configurations.

sets examples enter image description here enter image description here

I can also represent the configurations by either of two vectors:

C[001]= 5       V[001]=14
C[010]=10       V[010]=22
C[100]=11       V[100]=24
C[011]= 2       V[011]= 6
C[101]= 3       V[101]= 7
C[110]= 6       V[110]=10
C[111]= 4       V[111]= 4

What I want is to write a C/C++ function that transforms C into V as efficiently as possible. A naive approach could be the following 'transfo' function that is obviously in O(4^n) :

#include <vector>
#include <cstdio>

using namespace std;

vector<size_t> transfo (const vector<size_t>& C)
{
    vector<size_t> V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
} 


int main()
{
    vector<size_t> C = { 
        /* 000 */  0,
        /* 001 */  5,
        /* 010 */ 10,
        /* 011 */  2,
        /* 100 */ 11,
        /* 101 */  3,
        /* 110 */  6,
        /* 111 */  4
    };

    vector<size_t> V = transfo (C);

    for (size_t i=1; i<V.size(); i++)  {  printf ("[%2ld]  C=%2ld   V=%2ld\n", i, C[i], V[i]);  }
}

My question is : is there a more efficient algorithm than the naive one for transforming a vector C into a vector V ? And what would be the complexity of such a "good" algorithm ?

Note that I could be interested by any SIMD solution.

Upvotes: 1

Views: 779

Answers (2)

edrezen
edrezen

Reputation: 1628

According to the great observation of @CătălinFrâncu, I wrote two recursive implementations of the transformation (see code below) :

  • transfo_recursive: very straightforward recursive implementation
  • transfo_avx2 : still recursive but use AVX2 for last step of the recursion for n=3

I propose here that the sizes of the counters are coded on 32 bits and that the n value can grow up to 28.

I also wrote an iterative implementation (transfo_iterative) based on my own observation on the recursion behaviour. Actually, I guess it is close to the non recursive implementation proposed by @chtz.

Here is the benchmark code:

// compiled with: g++ -O3   intersect.cpp -march=native -mavx2 -lpthread -DNDEBUG

#include <vector>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <thread>
#include <algorithm>
#include <sys/times.h>
#include <immintrin.h>
#include <boost/align/aligned_allocator.hpp>

using namespace std;

////////////////////////////////////////////////////////////////////////////////

typedef u_int32_t Count;

// Note: alignment is important for AVX2
typedef std::vector<Count,boost::alignment::aligned_allocator<Count, 8*sizeof(Count)>>  CountVector;

typedef void (*callback) (CountVector::pointer C, size_t q);
typedef vector<pair<const char*, callback>> FunctionsVector;

unsigned int randomSeed = 0;

////////////////////////////////////////////////////////////////////////////////
double timestamp()
{
    struct timespec timet;
    clock_gettime(CLOCK_MONOTONIC, &timet);
    return timet.tv_sec + (timet.tv_nsec/ 1000000000.0);
}

////////////////////////////////////////////////////////////////////////////////
CountVector getRandomVector (size_t n)
{
    // We use the same seed, so we'll get the same random values
    srand (randomSeed);

    // We fill a vector of size q=2^n with random values
    CountVector C(1ULL<<n);
    for (size_t i=0; i<C.size(); i++)  { C[i] = rand() % (1ULL<<(8*sizeof(Count))); }
    return C;
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block (CountVector::pointer C, size_t q)
{
    for (size_t i=0; i<q/2; i++)   {  C[i] += C[i+q/2]; }
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block_avx2 (CountVector::pointer C, size_t q)
{
    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    size_t imax = q/(2*8);

    for (size_t i=0; i<imax; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Naive approach : O(4^n)
////////////////////////////////////////////////////////////////////////////////
CountVector transfo_naive (const CountVector& C)
{
    CountVector V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recursive (CountVector::pointer C, size_t q)
{
    if (q>1)
    {
        transfo_recursive (C+q/2, q/2);
        transfo_recursive (C,     q/2);

        copy_add_block (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Iterative approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_iterative (CountVector::pointer C, size_t q)
{
    size_t i = 0;

    for (size_t n=q; n>1; n>>=1, i++)
    {
        size_t d = 1<<i;

        for (ssize_t j=q-1-d; j>=0; j--)
        {
            if ( ((j>>i)&1)==0) { C[j] += C[j+d]; }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive AVX2 approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////

#define ROTATE1(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,7,6,5,4,3,2,1))
#define ROTATE2(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,7,6,5,4,3,2))
#define ROTATE4(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,0,0,7,6,5,4))

void transfo_avx2 (CountVector::pointer V, size_t N)
{
    __m256i k1 = _mm256_set_epi32 (0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF);
    __m256i k2 = _mm256_set_epi32 (0,0,0xFFFFFFFF,0xFFFFFFFF,0,0,0xFFFFFFFF,0xFFFFFFFF);
    __m256i k4 = _mm256_set_epi32 (0,0,0,0,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF);

    if (N==8)
    {
        __m256i* source = (__m256i*) (V);

        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE1(*source),k1));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE2(*source),k2));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE4(*source),k4));
    }
    else // if (N>8)
    {
        transfo_avx2 (V+N/2, N/2);
        transfo_avx2 (V,     N/2);

        copy_add_block_avx2  (V, N);
    }
}

#define ROTATE1_AND(s)  _mm256_srli_epi64 ((s), 32)  // odd 32bit elements
#define ROTATE2_AND(s)  _mm256_bsrli_epi128 ((s), 8) // high 64bit halves
// gcc doesn't have _mm256_zextsi128_si256
// and _mm256_castsi128_si256 doesn't guarantee zero extension
// vperm2i118 can do the same job as vextracti128, but is slower on Ryzen
#ifdef __clang__                                      // high 128bit lane
#define ROTATE4_AND(s)  _mm256_zextsi128_si256(_mm256_extracti128_si256((s),1))
#else
//#define ROTATE4_AND(s)  _mm256_castsi128_si256(_mm256_extracti128_si256((s),1))
#define ROTATE4_AND(s)  _mm256_permute2x128_si256((s),(s),0x81)  // high bit set = zero that lane
#endif

void transfo_avx2_pcordes (CountVector::pointer C, size_t q)
{
    if (q==8)
    {
        __m256i* source = (__m256i*) (C);
        __m256i tmp = *source;
        tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
        *source = tmp;
    }
    else //if (N>8)
    {
        transfo_avx2_pcordes (C+q/2, q/2);
        transfo_avx2_pcordes (C,     q/2);

        copy_add_block_avx2  (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Template specialization (same as transfo_avx2_pcordes)
////////////////////////////////////////////////////////////////////////////////
template <int n>
void transfo_template (__m256i* C)
{
    const size_t q = 1ULL << n;

    transfo_template<n-1> (C);
    transfo_template<n-1> (C + q/2);

    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    for (size_t i=0; i<q/2; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

template <>
void transfo_template<0> (__m256i* C)
{
    __m256i* source = (__m256i*) (C);
    __m256i tmp = *source;
    tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
    *source = tmp;
}

void transfo_recur_template (CountVector::pointer C, size_t q)
{
#define CASE(n)     case 1ULL<<n: transfo_template<n> ((__m256i*)C);   break;

    q = q / 8; // 8 is the number of 32 bits items in the AVX2 registers

    // We have to 'link' the dynamic value of q with a static template specialization
    switch (q)
    {
                  CASE( 1); CASE( 2); CASE( 3); CASE( 4); CASE( 5); CASE( 6); CASE( 7); CASE( 8); CASE( 9);
        CASE(10); CASE(11); CASE(12); CASE(13); CASE(14); CASE(15); CASE(16); CASE(17); CASE(18); CASE(19);
        CASE(20); CASE(21); CASE(22); CASE(23); CASE(24); CASE(25); CASE(26); CASE(27); CASE(28); CASE(29);

        default: printf ("transfo_template undefined for q=%ld\n", q);  break;
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach multithread : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recur_thread (CountVector::pointer C, size_t q)
{
    std::thread t1 (transfo_recur_template, C+q/2, q/2);
    std::thread t2 (transfo_recur_template, C,     q/2);
    t1.join();
    t2.join();

    copy_add_block_avx2 (C, q);
}

////////////////////////////////////////////////////////////////////////////////
void header (const char* title, const FunctionsVector& functions)
{
    printf ("\n");
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%s\n", title);
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%3s\t", "# n");
    for (auto fct : functions)  {  printf ("%20s\t", fct.first);  }
    printf ("\n");
}

////////////////////////////////////////////////////////////////////////////////
// Check that alternative implementations provide the same result as the naive one
////////////////////////////////////////////////////////////////////////////////
void check (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("CHECK (0 values means similar to naive approach)", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);

        CountVector reference = transfo_naive (getRandomVector(n));

        for (auto fct : functions)
        {
            // We call the (in place) transformation
            CountVector C = getRandomVector(n);
            (*fct.second) (C.data(), C.size());

            int nbDiffs= 0;
            for (size_t i=0; i<C.size(); i++)
            {
                if (reference[i]!=C[i]) { nbDiffs++; }
            }
            printf ("%20ld\t", nbDiffs);
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
// Performance test
////////////////////////////////////////////////////////////////////////////////
void performance (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("PERFORMANCE", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);
        for (auto fct : functions)
        {
            // We compute the average time for several executions
            // We use more executions for small n values in order
            // to have more accurate results
            size_t nbRuns = 1ULL<<(2+nmax-n);
            vector<double> timeValues;

            // We run the test several times
            for (size_t r=0; r<nbRuns; r++)
            {
                // We don't want to measure time for vector fill
                CountVector C = getRandomVector(n);

                double t0 = timestamp();
                (*fct.second) (C.data(), C.size());
                double t1 = timestamp();

                timeValues.push_back (t1-t0);
            }

            // We sort the vector of times in order to get the median value
            std::sort (timeValues.begin(), timeValues.end());

            double median = timeValues[timeValues.size()/2];
            printf ("%20lf\t", log(1000.0*1000.0*median)/log(2));
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
//
////////////////////////////////////////////////////////////////////////////////
int main (int argc, char* argv[])
{
    size_t nmin = argc>=2 ? atoi(argv[1]) : 14;
    size_t nmax = argc>=3 ? atoi(argv[2]) : 28;

    // We get a common random seed
    randomSeed = time(NULL);

    FunctionsVector functions = {
        make_pair ("transfo_recursive",        transfo_recursive),
        make_pair ("transfo_iterative",        transfo_iterative),
        make_pair ("transfo_avx2",             transfo_avx2),
        make_pair ("transfo_avx2_pcordes",     transfo_avx2_pcordes),
        make_pair ("transfo_recur_template",   transfo_recur_template),
        make_pair ("transfo_recur_thread",     transfo_recur_thread)
    };

    // We check for some n that alternative implementations
    // provide the same result as the naive approach
    check (functions, 5, 15);

    // We run the performance test
    performance (functions, nmin, nmax);
}

And here is the performance graph: enter image description here

One can observe that the simple recursive implementation is pretty good, even compared to the AVX2 version. The iterative implementation is a little bit disappointing but I made no big effort to optimize it.

Finally, for my own use case with 32 bits counters and for n values up to 28, these implementations are obviously ok for me compared to the initial "naive" approach in O(4^n).


Update

Following some remarks from @PeterCordes and @chtz, I added the following implementations:

  • transfo-avx2-pcordes : the same as transfo-avx2 with some AVX2 optimizations
  • transfo-recur-template : the same as transfo-avx2-pcordes but using C++ template specialization for implementing recursion
  • transfo-recur-thread : usage of multithreading for the two initial recursive calls of transfo-recur-template

Here is the updated benchmark result:

enter image description here

A few remarks about this result:

  1. the AVX2 implementations are logically the best options but maybe not with the maximum potential x8 speedup with counters of 32 bits
  2. among the AVX2 implementations, the template specialization brings a little speedup but it almost fades for bigger values for n
  3. the simple two-threads version has bad results for n<20; for n>=20, there is always a little speedup but far from a potential 2x.

Upvotes: 3

Cătălin Fr&#226;ncu
Cătălin Fr&#226;ncu

Reputation: 1199

Well, you are trying to compute 2n values, so you cannot do better than O(2n).

The naive approach starts from the observation that V[X] is obtained by fixing all the 1 bits in X and iterating over all the possible values where the 0 bits are. For example,

V[010] = C[010] + C[011] + C[110] + C[111]

But this approach performs O(2n) additions for every element of V, yielding a total complexity of O(4n).

Here is an O(n × 2n) algorithm. I too am curious if an O(2n) algorithm exists.

Let n = 4. Let us consider the full table of V versus C. Each line in the table below corresponds to one value of V and this value is calculated by summing up the columns marked with a *. The layout of * symbols can be easily deduced from the naive approach.

    |0000|0001|0010|0011|0100|0101|0110|0111||1000|1001|1010|1011|1100|1101|1110|1111
0000| *  | *  | *  | *  | *  | *  | *  | *  || *  | *  | *  | *  | *  | *  | *  | *  
0001|    | *  |    | *  |    | *  |    | *  ||    | *  |    | *  |    | *  |    | *  
0010|    |    | *  | *  |    |    | *  | *  ||    |    | *  | *  |    |    | *  | *  
0011|    |    |    | *  |    |    |    | *  ||    |    |    | *  |    |    |    | *  
0100|    |    |    |    | *  | *  | *  | *  ||    |    |    |    | *  | *  | *  | *  
0101|    |    |    |    |    | *  |    | *  ||    |    |    |    |    | *  |    | *  
0110|    |    |    |    |    |    | *  | *  ||    |    |    |    |    |    | *  | *  
0111|    |    |    |    |    |    |    | *  ||    |    |    |    |    |    |    | *  
-------------------------------------------------------------------------------------
1000|    |    |    |    |    |    |    |    || *  | *  | *  | *  | *  | *  | *  | *  
1001|    |    |    |    |    |    |    |    ||    | *  |    | *  |    | *  |    | *  
1010|    |    |    |    |    |    |    |    ||    |    | *  | *  |    |    | *  | *  
1011|    |    |    |    |    |    |    |    ||    |    |    | *  |    |    |    | *  
1100|    |    |    |    |    |    |    |    ||    |    |    |    | *  | *  | *  | *  
1101|    |    |    |    |    |    |    |    ||    |    |    |    |    | *  |    | *  
1110|    |    |    |    |    |    |    |    ||    |    |    |    |    |    | *  | *  
1111|    |    |    |    |    |    |    |    ||    |    |    |    |    |    |    | *  

Notice that the top-left, top-right and bottom-right corners contain identical layouts. Therefore, we can perform some calculations in bulk as follows:

  1. Compute the bottom half of the table (the bottom-right corner).
  2. Add the values to the top half.
  3. Compute the top-left corner.

If we let q = 2n, Thus the recurrent complexity is

T(q) = 2T(q/2) + O(q)

which solves using the Master Theorem to

T(q) = O(q log q)

or, in terms of n,

T(n) = O(n × 2n)

Upvotes: 5

Related Questions