Reputation: 913
I have 200 vectors of size ranging from 1 to 4000000 stored in vecOfVec. I need to intersect these vectors with a single vector "vecSearched" of size 9000+ elements. I tried to do the same using the following code, however using perf tool I found that the intersection which I am doing is the bottleneck in my code. Is there some way by which I may perform an efficient intersection
#include <cstdlib>
#include <iostream>
#include <vector>
using namespace std;
int main(int argc, char** argv) {
vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++)
{
//first find first 9 values spaced at equi-distant places, use these 9 values for performing comparisons
vector<unsigned> equiSpacedVec;
if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue
{
continue;
}
unsigned elementIndex=0; //used for iterating over equiSpacedVec
unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second
//search for value in bucket and store it in bucketValPos
bool firstRun=true;
for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos)
{
//construct a summarized vector out of individual vectors of vecOfVec
if(firstRun)
{
firstRun=false;
unsigned elementIndex1=0; //used for iterating over equiSpacedVec
while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors
{
if((elementIndex1+(10000))<(vecOfVec[kbt].size()))
elementIndex1+=10000;
else
break;
equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]);
}
}
//skip individual vectors of vecOfVec using summarized vector constructed above
while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){
elementIndex+=1;
if((i+100)<(vecOfVec[kbt].size()))
i+=100;
}
unsigned j=i;
while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){
j++;
}
if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position.
{
break;
}
if((*itValPos)==vecOfVec[kbt][j])
{
//store intersection result
}
}
}
return 0;
}
Upvotes: 6
Views: 679
Reputation: 2637
The simplest way to compare two sorted vectors is to iterate through both of them at the same time, and only increment whichever iterator has the smaller value. This is guaranteed to require the smallest number of comparisons if both vectors are unique. Actually, you can use the same code for all sorts of collections linked lists.
@Nikos Athanasiou (answer above) gives lots of useful tips to speed up your code, like using skip lists, simd comparisons. However your dataset is so tiny that even the straightforward naive code here runs blindingly fast...
template<typename CONT1, typename CONT2, typename OP_MARKIDENTICAL>
inline
void set_mark_overlap( const CONT1& cont1,
const CONT2& cont2,
OP_MARKIDENTICAL op_markidentical)
{
auto ii = cont1.cbegin();
auto end1 = cont1.cend();
auto jj = cont2.cbegin();
auto end2 = cont2.cend();
if (cont1.empty() || cont2.empty())
return;
for (;;)
{
// increment iterator to container 1 if it is less
if (*ii < *jj)
{
if (++ii == end1)
break;
}
// increment iterator to container 2 if it is less
else if (*jj < *ii)
{
if (++jj == end2)
break;
}
// same values
// increment both iterators
else
{
op_markidentical(*ii);
++ii;
if (ii == end1)
break;
//
// Comment if container1 can contain duplicates
//
++jj;
if (jj == end2)
break;
}
}
}
Here is how you might use this code:
template<typename TT>
struct op_store
{
vector<TT>& store;
op_store(vector<TT>& store): store(store){}
void operator()(TT val){store.push_back(val);}
};
vector<unsigned> first{1,2,3,4,5,6};
vector<unsigned> second{1,2,5,6, 7,9};
vector<unsigned> overlap;
set_mark_overlap( first, second, op_store<unsigned>(overlap));
for (const auto& ii : overlap)
std::cout << ii << ",";
std::cout << ii << "\n";
// 1,2,5,6
This code assumes that neither vector contains duplicates. If any of your vecOfVec contains duplicates, and you want each duplicate to be printed out, then you need to comment the indicated code above. If your vecSearched vector contains duplicates, it is not clear what would be the appropriate response...
In your case, the code to store matching values would be just these three lines:
// results
vector<vector<unsigned> > results(120);
for (unsigned ii = 0; ii < vecOfVec.size(); ++ii)
set_mark_overlap(vecSearched, vecOfVec[ii], op_store<unsigned>(results[ii]));
In terms of optimisation, your problem has two characteristics: 1) One list is always much shorter than the other 2) The shorter list is reused while the longer list is new to every comparison.
Up front costs (of pre-processing, e.g. for skip lists as suggested by @Nikos Athanasiou) are irrelevant with the short list of 9000 (which is used again and again) but not the longer list.
I imagine most of the skipping is for the longer lists, so skip lists may not be a panacea. How about doing a sort of dynamic skip-list so that you jump by N ( j += 4,000,000 / 9000) of by one (++j) when container two is catching up (in the code above). If you have jumped two far, then you can use a mini-binary search to find the right amount to increment j by.
Because of this asymmetry in list lengths, I can't see recoding with SIMD is going to help: we need to minimise the number of comparisons to less than (N+M) rather than increases the speed of each comparisons. However, it depends on your data. Code up and time things!
Here is the test code to create some vectors of random numbers and check that they are present
#include <iostream>
#include <vector>
#include <unordered_set>
#include <exception>
#include <algorithm>
#include <limits>
#include <random>
using namespace std;
template<typename TT>
struct op_store
{
std::vector<TT>& store;
op_store(vector<TT>& store): store(store){}
void operator()(TT val, TT val2){if (val != val2) std::cerr << val << " !- " << val2 << "\n"; store.push_back(val);}
};
void fill_vec_with_unique_random_values(vector<unsigned>& cont, unordered_set<unsigned>& curr_values)
{
static random_device rd;
static mt19937 e1(rd());
static uniform_int_distribution<unsigned> uniform_dist(0, std::numeric_limits<unsigned>::max());
for (auto& jj : cont)
{
for (;;)
{
unsigned new_value = uniform_dist(e1);
// make sure all values are unique
if (curr_values.count(new_value) == 0)
{
curr_values.insert(new_value);
jj = new_value;
break;
}
}
}
}
int main (int argc, char *argv[])
{
static random_device rd;
static mt19937 e1(rd());
// vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
vector<unsigned> vecSearched(9000);
unordered_set<unsigned> unique_values_9000;
fill_vec_with_unique_random_values(vecSearched, unique_values_9000);
//
// Create 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
//
vector<vector<unsigned> > vecOfVec(5);
normal_distribution<> vec_size_normal_dist(1000000U, 500000U);
for (unsigned ii = 0; ii < vecOfVec.size(); ++ii)
{
std::cerr << " Create Random data set" << ii << " ...\n";
auto vec_size = min(2000000U, static_cast<unsigned>(vec_size_normal_dist(e1)));
vecOfVec[ii].resize(vec_size);
// Do NOT share values with the 9000. We will manually add these later
unordered_set<unsigned> unique_values(unique_values_9000);
fill_vec_with_unique_random_values(vecOfVec[ii], unique_values);
}
// insert half of vecSearched in our 120 vectors so that we know what we are going to find
vector<unsigned> correct_results(begin(vecSearched), begin(vecSearched) + 4500);
for (unsigned ii = 0; ii < vecOfVec.size(); ++ii)
vecOfVec[ii].insert(vecOfVec[ii].end(), begin(correct_results), end(correct_results));
// Make sure everything is sorted
std::cerr << " Sort data ...\n";
for (unsigned ii = 0; ii < vecOfVec.size(); ++ii)
sort(begin(vecOfVec[ii]), end(vecOfVec[ii]));
sort(begin(vecSearched), end(vecSearched));
sort(begin(correct_results), end(correct_results));
std::cerr << " Match ...\n";
// results
vector<vector<unsigned> > results(120);
for (unsigned ii = 0; ii < vecOfVec.size(); ++ii)
{
std::cerr << ii << " done\n";
set_mark_overlap(vecSearched, vecOfVec[ii], op_store<unsigned>(results[ii]));
// check all is well
if (results[ii] != correct_results)
throw runtime_error("Oops");
}
return(0);
}
Upvotes: 2
Reputation: 31597
Your problem is a very popular one. Since you have no data correlating the vectors to intersect, it boils down to speeding up the intersection between two vectors and there are basically two approaches to it :
This is usually adressed by three things :
Recuding the number of comparisons. Eg for small vectors (sized 1 to 50) you should binary search each element to avoid traversing all the 9000+ elements of the subject vector.
Improving code quality to reduce branch mispredictions, eg observing that the resulting set will usually be of smaller size than the input sets could transform such code :
while (Apos < Aend && Bpos < Bend) {
if (A[Apos] == B[Bpos]) {
C[Cpos++] = A[Apos];
Apos++; Bpos++;
}
else if (A[Apos] > B[Bpos]) {
Bpos++;
}
else {
Apos++;
}
}
to code that "unrolls" such comparisons creating though easier to predict branches (example for block size = 2) :
while (1) {
Adat0 = A[Apos]; Adat1 = A[Apos + 1];
Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1];
if (Adat0 == Bdat0) {
C[Cpos++] = Adat0;
}
else if (Adat0 == Bdat1) {
C[Cpos++] = Adat0;
goto advanceB;
}
else if (Adat1 == Bdat0) {
C[Cpos++] = Adat1;
goto advanceA;
}
if (Adat1 == Bdat1) {
C[Cpos++] = Adat1;
goto advanceAB;
}
else if (Adat1 > Bdat1) goto advanceB;
else goto advanceA;
advanceA:
Apos+=2;
if (Apos >= Aend) { break; } else { continue; }
advanceB:
Bpos+=2;
if (Bpos >= Bend) { break; } else { continue; }
advanceAB:
Apos+=2; Bpos+=2;
if (Apos >= Aend || Bpos >= Bend) { break; }
}
// fall back to naive algorithm for remaining elements
Using SIMD instructions to perform block operations
These techniques are hard to describe in a QA context but you can read about them (plus relevant optimizations like the if conversion
) here and here or find implementation elements here
This IMHO is a better way for your case because you have a single subject vector of size 9000+ elements. You could make an interval tree out of it or simply find a way to index it, eg make a structure that will speed up the search against it :
vector<unsigned> subject(9000+);
vector<range> index(9000/M);
where range is a struct like
struct range {
unsigned min, max;
};
thus creating a sequence of ranges like so
[0, 100], [101, 857], ... [33221, 33500]
that will allow to skip many many comparisons when doing the intersection (for example if an element of the other set is larger than the max of a subrange, you can skip that subrange completely)
Yep, there's always a third element in a list of two :P. When you have optimized enough the procedure (and only then), do break up your work into chunks and run it in parallel. The problem fits an embarassing pattern so 200 vectors vs 1 should definetely run as "50 vs 1 four times concurrently"
Test, measure, redesign!!
Upvotes: 5
Reputation: 1723
using set_intersection could help, but i do not know, if it improves the overall speed:
vector<vector<unsigned int> > vecOfVec(200);
vector<unsigned int> vecSearched;
set<unsigned int> intersection;
for(auto it = vecOfVec.begin(); it != vecOfVec.end(); ++it)
{
std::set_intersection(it->begin(), it->end(), vecSearched.begin(), vecSearched.end(), std::inserter(intersection, intersection.begin()));
}
Upvotes: 1
Reputation: 5230
If I understood correctly your code then if N is the number of vectors and M the number of elements inside each vector then your algorithm is roughly O(N * M^2). Then there is the 'bucket' strategy that improves things a bit but its effect is difficult to evaluate at a first sight.
I would suggest you to work on sorted vectors and make intersections on sorted ones. Something like this:
vector<vector<unsigned> > vecOfVec;
vector<unsignend> vecSearched ;
for (vector<unsigned> v : vecSearched) // yes, a copy
{
std::sort(v.begin(), v.end()) ;
if (vecSearched.empty()) // first run detection
vSearched = v ;
else
{ // compute intersection of v and vecSearch
auto vit = v.begin() ;
auto vend = v.end() ;
auto sit = vecSearched.begin() ;
auto send = vecSearched.end() ;
vector<unsiged> result ;
while (vit != vend && sit != send)
{
if (*vit < *sit)
vit++ ;
else if (*vit == *sit)
{
result.push_bck(*it) ;
++vit ;
++sit ;
}
else // *vit > *sit
++sit ;
}
vecSearched = result ;
}
}
Code is untested, anyway the idea behind it is that intersection on sorted vectors is easier since you can compare those two iterators (vit, sit) and grow the one pointing to the smaller one. So intersecton is linear in M and whole complexity is O(N * M *log(M)), where log(M) is due to sorting
Upvotes: 4