Reputation: 1719
I have two int
vectors
like a[100]
, b[100]
.
The simple way to calculate their hamming distance is:
std::vector<int> a(100);
std::vector<int> b(100);
double dist = 0;
for(int i = 0; i < 100; i++){
if(a[i] != b[i])
dist++;
}
dist /= a.size();
I would like to ask that is there a faster way to do this calculation in C++ or how to use STL to do the same job?
Upvotes: 6
Views: 6208
Reputation: 12018
You asked for a faster way. This is a embarrassingly parallel problem, so, with C++ you can take advantage of that in two ways: thread parallelism, and vectorization through optimization.
//The following flags allow cpu specific vectorization optimizations on *my cpu*
//clang++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y
//g++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y
#include <vector>
#include <thread>
#include <future>
#include <numeric>
template<class T, class I1, class I2>
T hamming_distance(size_t size, I1 b1, I2 b2) {
return std::inner_product(b1, b1 + size, b2, T{},
std::plus<T>(), std::not_equal_to<T>());
}
template<class T, class I1, class I2>
T parallel_hamming_distance(size_t threads, size_t size, I1 b1, I2 b2) {
if(size < 1000)
return hamming_distance<T, I1, I2>(size, b1, b2);
if(threads > size)
threads = size;
const size_t whole_part = size / threads;
const size_t remainder = size - threads * whole_part;
std::vector<std::future<T>> bag;
bag.reserve(threads + (remainder > 0 ? 1 : 0));
for(size_t i = 0; i < threads; ++i)
bag.emplace_back(std::async(std::launch::async,
hamming_distance<T, I1, I2>,
whole_part,
b1 + i * whole_part,
b2 + i * whole_part));
if(remainder > 0)
bag.emplace_back(std::async(std::launch::async,
hamming_distance<T, I1, I2>,
remainder,
b1 + threads * whole_part,
b2 + threads * whole_part));
T hamming_distance = 0;
for(auto &f : bag) hamming_distance += f.get();
return hamming_distance;
}
#include <ratio>
#include <random>
#include <chrono>
#include <iostream>
#include <cinttypes>
int main() {
using namespace std;
using namespace chrono;
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> random_0_9(0, 9);
const auto size = 100 * mega::num;
vector<int32_t> v1(size);
vector<int32_t> v2(size);
for(auto &x : v1) x = random_0_9(gen);
for(auto &x : v2) x = random_0_9(gen);
cout << "naive hamming distance: ";
const auto naive_start = high_resolution_clock::now();
cout << hamming_distance<int32_t>(v1.size(), begin(v1), begin(v2)) << endl;
const auto naive_elapsed = high_resolution_clock::now() - naive_start;
const auto n = thread::hardware_concurrency();
cout << "parallel hamming distance: ";
const auto parallel_start = high_resolution_clock::now();
cout << parallel_hamming_distance<int32_t>(
n,
v1.size(),
begin(v1),
begin(v2)
)
<< endl;
const auto parallel_elapsed = high_resolution_clock::now() - parallel_start;
auto count_microseconds =
[](const high_resolution_clock::duration &elapsed) {
return duration_cast<microseconds>(elapsed).count();
};
cout << "naive delay: " << count_microseconds(naive_elapsed) << endl;
cout << "parallel delay: " << count_microseconds(parallel_elapsed) << endl;
}
notice that I'm not taking the division over the vector size
Results for my machine (which shows it didn't get much for a machine which only 2 physical cores...):
$ clang++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y -stdlib=libc++ -lcxxrt -ldl
$ ./hd
naive hamming distance: 89995190
parallel hamming distance: 89995190
naive delay: 52758
parallel delay: 47227
$ clang++ hd.cpp -o hd -O3 -pthread -std=c++1y -stdlib=libc++ -lcxxrt -ldl
$ ./hd
naive hamming distance: 90001042
parallel hamming distance: 90001042
naive delay: 53851
parallel delay: 46887
$ g++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y -Wl,--no-as-needed
$ ./hd
naive hamming distance: 90001825
parallel hamming distance: 90001825
naive delay: 55229
parallel delay: 49355
$ g++ hd.cpp -o hd -O3 -pthread -std=c++1y -Wl,--no-as-needed
$ ./hd
naive hamming distance: 89996171
parallel hamming distance: 89996171
naive delay: 54189
parallel delay: 44928
Also I see no effect from auto vectorization, may have to check the assembly...
For a sample about vectorization and compiler options check this blog post of mine.
Upvotes: 5
Reputation: 33669
There is a very simple way to optimize this.
int disti = 0;
for(int i = 0; i < n; i++) disti += (a[i] != b[i]);
double dist = 1.0*disti/a.size();
This skips the branch and uses the virtue that a conditional test returns 1 or 0. Additionally, it is auto-vectorized in GCC (-ftree-vectorizer-verbose=1
to check) while the version in the question is not.
Edit:
I went ahead and tested this out with the function in the question which I called hamming_distance
the simple fix I suggested which I call hamming_distance_fix
and a version which uses the fix as well as OpenMP which I call hamming_distance_fix_omp
. Here are the times
hamming_distance 1.71 seconds
hamming_distance_fix 0.38 seconds //SIMD
hamming_distance_fix_omp 0.12 seconds //SIMD + MIMD
Here is the code. I did not use much syntactic candy but it should be very easy to convert this to use STL and so forth...You can see the results here http://coliru.stacked-crooked.com/a/31293bc88cff4794
//g++-4.8 -std=c++11 -O3 -fopenmp -msse2 -Wall -pedantic -pthread main.cpp && ./a.out
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
double hamming_distance(int* a, int*b, int n) {
double dist = 0;
for(int i=0; i<n; i++) {
if (a[i] != b[i]) dist++;
}
return dist/n;
}
double hamming_distance_fix(int* a, int* b, int n) {
int disti = 0;
for(int i=0; i<n; i++) {
disti += (a[i] != b[i]);
}
return 1.0*disti/n;
}
double hamming_distance_fix_omp(int* a, int* b, int n) {
int disti = 0;
#pragma omp parallel for reduction(+:disti)
for(int i=0; i<n; i++) {
disti += (a[i] != b[i]);
}
return 1.0*disti/n;
}
int main() {
const int n = 1<<16;
const int repeat = 10000;
int *a = new int[n];
int *b = new int[n];
for(int i=0; i<n; i++)
{
a[i] = rand()%10;
b[i] = rand()%10;
}
double dtime, dist;
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) dist = hamming_distance(a,b,n);
dtime = omp_get_wtime() - dtime;
printf("dist %f, time (s) %f\n", dist, dtime);
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) dist = hamming_distance_fix(a,b,n);
dtime = omp_get_wtime() - dtime;
printf("dist %f, time (s) %f\n", dist, dtime);
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) dist = hamming_distance_fix_omp(a,b,n);
dtime = omp_get_wtime() - dtime;
printf("dist %f, time (s) %f\n", dist, dtime);
}
Upvotes: 3
Reputation: 75727
As an observation, working with double is very slow, even for increment. so you should use a int inside the for
(incrementing), and then use a double for the division.
As a speed up, one way to test I could think of is to use SSE instructions:
Pseudocode:
distance = 0
SSE register e1
SSE register e2
for each 4 elements in vectors
load 4 members from a in e1
load 4 members from b in e2
if e1 == e2
continue
else
check each 4 members individually (using e1 and e2)
dist /= 4
In a real (not-pseudocode) program, this can be twitched so that the compiler can use cmov
instructions instead of branches
.
The main advantage here is that we have 4 times less reads from memory.
A disadvantage is that we have an extra check for each 4 checks we had previously.
Depending on how this gets implemented in assembly via cmoves
or branches
, this might be even faster for vectors that have many adjacent positions with the same value in the two vectors.
I really can't tell how this will perform comparing with the standard solution, but at the very least is worth testing.
Upvotes: 0