Steg Verner
Steg Verner

Reputation: 913

Performing intersection of vector c++

I have 200 vectors of size ranging from 1 to 4000000 stored in vecOfVec. I need to intersect these vectors with a single vector "vecSearched" of size 9000+ elements. I tried to do the same using the following code, however using perf tool I found that the intersection which I am doing is the bottleneck in my code. Is there some way by which I may perform an efficient intersection

#include <cstdlib>
#include <iostream>
#include <vector>

using namespace std;

int main(int argc, char** argv) {
  vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
  vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
  for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++)
  {                                  
     //first find first 9 values spaced at equi-distant places, use these 9 values for performing comparisons
     vector<unsigned> equiSpacedVec;                             

     if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue
     {
         continue;                             
     }                         

     unsigned elementIndex=0; //used for iterating over equiSpacedVec                             
     unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second
     //search for value in bucket and store it in bucketValPos
     bool firstRun=true;             
     for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos)
     {
         //construct a summarized vector out of individual vectors of vecOfVec
         if(firstRun)
         {
             firstRun=false;
             unsigned elementIndex1=0; //used for iterating over equiSpacedVec
             while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors
             {
                  if((elementIndex1+(10000))<(vecOfVec[kbt].size()))
                     elementIndex1+=10000;
                  else
                     break;
                 equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]);
             }          
         }
         //skip individual vectors of vecOfVec using summarized vector constructed above
         while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){
             elementIndex+=1;
             if((i+100)<(vecOfVec[kbt].size()))
                 i+=100;
         }            
         unsigned j=i;
         while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){
             j++;
         }
         if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position.
         {
             break;
         }              

         if((*itValPos)==vecOfVec[kbt][j])
         {                                     
             //store intersection result
         }
     }
  }
    return 0;
}

Upvotes: 6

Views: 679

Answers (4)

Leo Goodstadt
Leo Goodstadt

Reputation: 2637

The simplest way to compare two sorted vectors is to iterate through both of them at the same time, and only increment whichever iterator has the smaller value. This is guaranteed to require the smallest number of comparisons if both vectors are unique. Actually, you can use the same code for all sorts of collections linked lists.

@Nikos Athanasiou (answer above) gives lots of useful tips to speed up your code, like using skip lists, simd comparisons. However your dataset is so tiny that even the straightforward naive code here runs blindingly fast...

template<typename CONT1, typename CONT2, typename OP_MARKIDENTICAL>
inline
void set_mark_overlap(  const CONT1& cont1,
                        const CONT2& cont2,
                        OP_MARKIDENTICAL op_markidentical)
{
    auto ii = cont1.cbegin();
    auto end1 = cont1.cend();
    auto jj = cont2.cbegin();
    auto end2 = cont2.cend();

    if (cont1.empty() || cont2.empty())
        return;

    for (;;)
    {
        // increment iterator to container 1 if it is less
        if (*ii < *jj)
        {
            if (++ii == end1)
                break;
        }
        // increment iterator to container 2 if it is less
        else if (*jj < *ii)
        {
            if (++jj == end2)
                break;
        }
        // same values
        // increment both iterators
        else
        {
            op_markidentical(*ii);
            ++ii;
            if (ii == end1)
                break;
            // 
            // Comment if container1 can contain duplicates
            // 
            ++jj;
            if (jj == end2)
                break;
        }
    }
}

Here is how you might use this code:

template<typename TT>
struct op_store
{
    vector<TT>& store;
    op_store(vector<TT>& store): store(store){}
    void operator()(TT val){store.push_back(val);}
};


vector<unsigned> first{1,2,3,4,5,6};
vector<unsigned> second{1,2,5,6, 7,9};
vector<unsigned> overlap;
set_mark_overlap(   first, second, op_store<unsigned>(overlap));
for (const auto& ii : overlap)
    std::cout << ii << ",";
std::cout << ii << "\n";
// 1,2,5,6

This code assumes that neither vector contains duplicates. If any of your vecOfVec contains duplicates, and you want each duplicate to be printed out, then you need to comment the indicated code above. If your vecSearched vector contains duplicates, it is not clear what would be the appropriate response...

In your case, the code to store matching values would be just these three lines:

// results
vector<vector<unsigned> > results(120); 
for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) 
    set_mark_overlap(vecSearched, vecOfVec[ii], op_store<unsigned>(results[ii]));

In terms of optimisation, your problem has two characteristics: 1) One list is always much shorter than the other 2) The shorter list is reused while the longer list is new to every comparison.

Up front costs (of pre-processing, e.g. for skip lists as suggested by @Nikos Athanasiou) are irrelevant with the short list of 9000 (which is used again and again) but not the longer list.

I imagine most of the skipping is for the longer lists, so skip lists may not be a panacea. How about doing a sort of dynamic skip-list so that you jump by N ( j += 4,000,000 / 9000) of by one (++j) when container two is catching up (in the code above). If you have jumped two far, then you can use a mini-binary search to find the right amount to increment j by.

Because of this asymmetry in list lengths, I can't see recoding with SIMD is going to help: we need to minimise the number of comparisons to less than (N+M) rather than increases the speed of each comparisons. However, it depends on your data. Code up and time things!

Here is the test code to create some vectors of random numbers and check that they are present

#include <iostream>
#include <vector>
#include <unordered_set>
#include <exception>
#include <algorithm>
#include <limits>
#include <random>

using namespace std;

template<typename TT>
struct op_store
{
    std::vector<TT>& store;
    op_store(vector<TT>& store): store(store){}
    void operator()(TT val, TT val2){if (val != val2) std::cerr << val << " !- " << val2 << "\n"; store.push_back(val);}
};




void fill_vec_with_unique_random_values(vector<unsigned>& cont, unordered_set<unsigned>& curr_values)
{
    static random_device rd;
    static mt19937 e1(rd());
    static uniform_int_distribution<unsigned> uniform_dist(0, std::numeric_limits<unsigned>::max());


    for (auto& jj : cont)
    {
        for (;;) 
        {
            unsigned new_value = uniform_dist(e1);
            // make sure all values are unique
            if (curr_values.count(new_value) == 0)
            {
                curr_values.insert(new_value);
                jj = new_value;
                break;
            }
        }
    }
}

int main (int argc, char *argv[])
{
    static random_device rd;
    static mt19937 e1(rd());

    //  vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
    vector<unsigned> vecSearched(9000);
    unordered_set<unsigned> unique_values_9000;
    fill_vec_with_unique_random_values(vecSearched, unique_values_9000);

    //
    //  Create 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
    // 
    vector<vector<unsigned> > vecOfVec(5); 
    normal_distribution<> vec_size_normal_dist(1000000U, 500000U);
    for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) 
    {
        std::cerr << " Create Random data set" << ii << " ...\n";
        auto vec_size = min(2000000U, static_cast<unsigned>(vec_size_normal_dist(e1)));
        vecOfVec[ii].resize(vec_size);

        // Do NOT share values with the 9000. We will manually add these later
        unordered_set<unsigned> unique_values(unique_values_9000);
        fill_vec_with_unique_random_values(vecOfVec[ii], unique_values);
    }



    // insert half of vecSearched in our 120 vectors so that we know what we are going to find
    vector<unsigned> correct_results(begin(vecSearched), begin(vecSearched) + 4500);
    for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) 
        vecOfVec[ii].insert(vecOfVec[ii].end(), begin(correct_results), end(correct_results));

    // Make sure everything is sorted
    std::cerr << " Sort data ...\n";
    for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) 
        sort(begin(vecOfVec[ii]), end(vecOfVec[ii]));
    sort(begin(vecSearched), end(vecSearched));
    sort(begin(correct_results), end(correct_results));


    std::cerr << " Match ...\n";
    // results
    vector<vector<unsigned> > results(120); 
    for (unsigned ii = 0; ii < vecOfVec.size(); ++ii) 
    {
        std::cerr << ii << " done\n";
        set_mark_overlap(vecSearched, vecOfVec[ii], op_store<unsigned>(results[ii]));

        // check all is well
        if (results[ii] != correct_results)
            throw runtime_error("Oops");
    }
    return(0);
}

Upvotes: 2

Nikos Athanasiou
Nikos Athanasiou

Reputation: 31597

Your problem is a very popular one. Since you have no data correlating the vectors to intersect, it boils down to speeding up the intersection between two vectors and there are basically two approaches to it :

1. Without any preprocessing

This is usually adressed by three things :

  • Recuding the number of comparisons. Eg for small vectors (sized 1 to 50) you should binary search each element to avoid traversing all the 9000+ elements of the subject vector.

  • Improving code quality to reduce branch mispredictions, eg observing that the resulting set will usually be of smaller size than the input sets could transform such code :

    while (Apos < Aend && Bpos < Bend) {
        if (A[Apos] == B[Bpos]) {
            C[Cpos++] = A[Apos];
            Apos++; Bpos++;
        }
        else if (A[Apos] > B[Bpos]) {
            Bpos++;
        } 
        else {
          Apos++;
        } 
    }
    

    to code that "unrolls" such comparisons creating though easier to predict branches (example for block size = 2) :

    while (1) {
        Adat0 = A[Apos]; Adat1 = A[Apos + 1]; 
        Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1];
        if (Adat0 == Bdat0) { 
            C[Cpos++] = Adat0;
        }
        else if (Adat0 == Bdat1) {
            C[Cpos++] = Adat0;
            goto advanceB;
        }
        else if (Adat1 == Bdat0) {
            C[Cpos++] = Adat1;
            goto advanceA;
        }
        if (Adat1 == Bdat1) {
            C[Cpos++] = Adat1;
            goto advanceAB;
        }
        else if (Adat1 > Bdat1) goto advanceB;
        else goto advanceA;
    advanceA:
        Apos+=2;
        if (Apos >= Aend) { break; } else { continue; }
    advanceB:
        Bpos+=2;
        if (Bpos >= Bend) { break; } else { continue; }
    advanceAB:
        Apos+=2;  Bpos+=2;
        if (Apos >= Aend || Bpos >= Bend) { break; }
    }
    //  fall back to naive algorithm for remaining elements
    
  • Using SIMD instructions to perform block operations

These techniques are hard to describe in a QA context but you can read about them (plus relevant optimizations like the if conversion) here and here or find implementation elements here

2. With preprocessing

This IMHO is a better way for your case because you have a single subject vector of size 9000+ elements. You could make an interval tree out of it or simply find a way to index it, eg make a structure that will speed up the search against it :

vector<unsigned> subject(9000+); 
vector<range>    index(9000/M); 

where range is a struct like

struct range {
    unsigned min, max; 
}; 

thus creating a sequence of ranges like so

[0, 100], [101, 857], ... [33221, 33500]

that will allow to skip many many comparisons when doing the intersection (for example if an element of the other set is larger than the max of a subrange, you can skip that subrange completely)

3. Parallelize

Yep, there's always a third element in a list of two :P. When you have optimized enough the procedure (and only then), do break up your work into chunks and run it in parallel. The problem fits an embarassing pattern so 200 vectors vs 1 should definetely run as "50 vs 1 four times concurrently"

Test, measure, redesign!!

Upvotes: 5

m47h
m47h

Reputation: 1723

using set_intersection could help, but i do not know, if it improves the overall speed:

vector<vector<unsigned int> > vecOfVec(200);
vector<unsigned int> vecSearched; 
set<unsigned int> intersection;
for(auto it = vecOfVec.begin(); it != vecOfVec.end(); ++it)
{
  std::set_intersection(it->begin(), it->end(), vecSearched.begin(), vecSearched.end(), std::inserter(intersection, intersection.begin()));
}

Upvotes: 1

marom
marom

Reputation: 5230

If I understood correctly your code then if N is the number of vectors and M the number of elements inside each vector then your algorithm is roughly O(N * M^2). Then there is the 'bucket' strategy that improves things a bit but its effect is difficult to evaluate at a first sight.

I would suggest you to work on sorted vectors and make intersections on sorted ones. Something like this:

vector<vector<unsigned> > vecOfVec;
vector<unsignend> vecSearched ;
for (vector<unsigned> v : vecSearched) // yes, a copy
{
   std::sort(v.begin(), v.end()) ;
   if (vecSearched.empty()) // first run detection
      vSearched = v ; 
   else
   {  // compute intersection of v and vecSearch
      auto vit = v.begin() ;
      auto vend = v.end() ;
      auto sit = vecSearched.begin() ;
      auto send = vecSearched.end() ;
      vector<unsiged> result ;
      while (vit != vend && sit != send)
      {
         if (*vit < *sit)
             vit++ ;
         else if (*vit == *sit)
         {
             result.push_bck(*it) ; 
             ++vit ;
             ++sit ;
         }
         else //  *vit > *sit
            ++sit ;
      }
      vecSearched = result ;
   }
}

Code is untested, anyway the idea behind it is that intersection on sorted vectors is easier since you can compare those two iterators (vit, sit) and grow the one pointing to the smaller one. So intersecton is linear in M and whole complexity is O(N * M *log(M)), where log(M) is due to sorting

Upvotes: 4

Related Questions