sflee
sflee

Reputation: 1719

Faster form for hamming distance in c++ (potentially taking advantage of standard library)?

I have two int vectors like a[100], b[100].
The simple way to calculate their hamming distance is:

std::vector<int> a(100);
std::vector<int> b(100);

double dist = 0;    
for(int i = 0; i < 100; i++){
    if(a[i] != b[i])
        dist++;
}
dist /= a.size();

I would like to ask that is there a faster way to do this calculation in C++ or how to use STL to do the same job?

Upvotes: 6

Views: 6208

Answers (3)

oblitum
oblitum

Reputation: 12018

You asked for a faster way. This is a embarrassingly parallel problem, so, with C++ you can take advantage of that in two ways: thread parallelism, and vectorization through optimization.

//The following flags allow cpu specific vectorization optimizations on *my cpu*
//clang++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y
//g++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y

#include <vector>
#include <thread>
#include <future>
#include <numeric>

template<class T, class I1, class I2>
T hamming_distance(size_t size, I1 b1, I2 b2) {
    return std::inner_product(b1, b1 + size, b2, T{},
            std::plus<T>(), std::not_equal_to<T>());
}

template<class T, class I1, class I2>
T parallel_hamming_distance(size_t threads, size_t size, I1 b1, I2 b2) {
    if(size < 1000)
       return hamming_distance<T, I1, I2>(size, b1, b2);

    if(threads > size)
        threads = size;

    const size_t whole_part = size / threads;
    const size_t remainder = size - threads * whole_part;

    std::vector<std::future<T>> bag;
    bag.reserve(threads + (remainder > 0 ? 1 : 0));

    for(size_t i = 0; i < threads; ++i)
        bag.emplace_back(std::async(std::launch::async,
                            hamming_distance<T, I1, I2>,
                            whole_part,
                            b1 + i * whole_part,
                            b2 + i * whole_part));
    if(remainder > 0)
        bag.emplace_back(std::async(std::launch::async,
                            hamming_distance<T, I1, I2>,
                            remainder,
                            b1 + threads * whole_part,
                            b2 + threads * whole_part));

    T hamming_distance = 0;
    for(auto &f : bag) hamming_distance += f.get();
    return hamming_distance;
}

#include <ratio>
#include <random>
#include <chrono>
#include <iostream>
#include <cinttypes>

int main() {
    using namespace std;
    using namespace chrono;

    random_device rd;
    mt19937 gen(rd());
    uniform_int_distribution<> random_0_9(0, 9);

    const auto size = 100 * mega::num;
    vector<int32_t> v1(size);
    vector<int32_t> v2(size);

    for(auto &x : v1) x = random_0_9(gen);
    for(auto &x : v2) x = random_0_9(gen);

    cout << "naive hamming distance: ";
    const auto naive_start = high_resolution_clock::now();
    cout << hamming_distance<int32_t>(v1.size(), begin(v1), begin(v2)) << endl;
    const auto naive_elapsed = high_resolution_clock::now() - naive_start;

    const auto n = thread::hardware_concurrency();

    cout << "parallel hamming distance: ";
    const auto parallel_start = high_resolution_clock::now();
    cout << parallel_hamming_distance<int32_t>(
                                                    n,
                                                    v1.size(),
                                                    begin(v1),
                                                    begin(v2)
                                              )
         << endl;
    const auto parallel_elapsed = high_resolution_clock::now() - parallel_start;

    auto count_microseconds =
        [](const high_resolution_clock::duration &elapsed) {
            return duration_cast<microseconds>(elapsed).count();
        };

    cout << "naive delay:    " << count_microseconds(naive_elapsed) << endl;
    cout << "parallel delay: " << count_microseconds(parallel_elapsed) << endl;
}

notice that I'm not taking the division over the vector size

Results for my machine (which shows it didn't get much for a machine which only 2 physical cores...):

$ clang++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y -stdlib=libc++ -lcxxrt -ldl
$ ./hd
naive hamming distance: 89995190
parallel hamming distance: 89995190
naive delay:    52758
parallel delay: 47227

$ clang++ hd.cpp -o hd -O3 -pthread -std=c++1y -stdlib=libc++ -lcxxrt -ldl
$ ./hd
naive hamming distance: 90001042
parallel hamming distance: 90001042
naive delay:    53851
parallel delay: 46887

$ g++ -march=corei7-avx hd.cpp -o hd -Ofast -pthread -std=c++1y -Wl,--no-as-needed
$ ./hd
naive hamming distance: 90001825
parallel hamming distance: 90001825
naive delay:    55229
parallel delay: 49355

$ g++ hd.cpp -o hd -O3 -pthread -std=c++1y -Wl,--no-as-needed
$ ./hd
naive hamming distance: 89996171
parallel hamming distance: 89996171
naive delay:    54189
parallel delay: 44928

Also I see no effect from auto vectorization, may have to check the assembly...

For a sample about vectorization and compiler options check this blog post of mine.

Upvotes: 5

Z boson
Z boson

Reputation: 33669

There is a very simple way to optimize this.

int disti = 0;    
for(int i = 0; i < n; i++) disti += (a[i] != b[i]);
double dist = 1.0*disti/a.size();

This skips the branch and uses the virtue that a conditional test returns 1 or 0. Additionally, it is auto-vectorized in GCC (-ftree-vectorizer-verbose=1 to check) while the version in the question is not.

Edit:

I went ahead and tested this out with the function in the question which I called hamming_distance the simple fix I suggested which I call hamming_distance_fix and a version which uses the fix as well as OpenMP which I call hamming_distance_fix_omp. Here are the times

hamming_distance          1.71 seconds
hamming_distance_fix      0.38 seconds  //SIMD
hamming_distance_fix_omp  0.12 seconds  //SIMD + MIMD

Here is the code. I did not use much syntactic candy but it should be very easy to convert this to use STL and so forth...You can see the results here http://coliru.stacked-crooked.com/a/31293bc88cff4794

//g++-4.8 -std=c++11 -O3 -fopenmp -msse2 -Wall -pedantic -pthread main.cpp && ./a.out
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>

double hamming_distance(int* a, int*b, int n) {
    double dist = 0;
    for(int i=0; i<n; i++) {
        if (a[i] != b[i]) dist++;
    }
    return dist/n;
}
double hamming_distance_fix(int* a, int* b, int n) {
    int disti = 0;
    for(int i=0; i<n; i++) {
       disti += (a[i] != b[i]);
    }
    return 1.0*disti/n;
}

double hamming_distance_fix_omp(int* a, int* b, int n) {
    int disti = 0;
    #pragma omp parallel for reduction(+:disti)
    for(int i=0; i<n; i++) {
       disti += (a[i] != b[i]);
    }
    return 1.0*disti/n;
}

int main() {
    const int n = 1<<16;
    const int repeat = 10000;
    int *a = new int[n];
    int *b = new int[n];
    for(int i=0; i<n; i++) 
    { 
        a[i] = rand()%10;
        b[i] = rand()%10;
    }

    double dtime, dist;
    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) dist = hamming_distance(a,b,n);
    dtime = omp_get_wtime() - dtime;
    printf("dist %f, time (s) %f\n", dist, dtime);

    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) dist = hamming_distance_fix(a,b,n);
    dtime = omp_get_wtime() - dtime;
    printf("dist %f, time (s) %f\n", dist, dtime);

    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) dist = hamming_distance_fix_omp(a,b,n);
    dtime = omp_get_wtime() - dtime;
    printf("dist %f, time (s) %f\n", dist, dtime);  
}

Upvotes: 3

bolov
bolov

Reputation: 75727

As an observation, working with double is very slow, even for increment. so you should use a int inside the for (incrementing), and then use a double for the division.

As a speed up, one way to test I could think of is to use SSE instructions:

Pseudocode:

distance = 0
SSE register e1
SSE register e2
for each 4 elements in vectors
  load 4 members from a in e1
  load 4 members from b in e2
  if e1 == e2
    continue
  else
    check each 4 members individually (using e1 and e2)
dist /= 4

In a real (not-pseudocode) program, this can be twitched so that the compiler can use cmov instructions instead of branches.

The main advantage here is that we have 4 times less reads from memory.
A disadvantage is that we have an extra check for each 4 checks we had previously.
Depending on how this gets implemented in assembly via cmoves or branches, this might be even faster for vectors that have many adjacent positions with the same value in the two vectors.

I really can't tell how this will perform comparing with the standard solution, but at the very least is worth testing.

Upvotes: 0

Related Questions