Kroma
Kroma

Reputation: 1151

SIMD vector intersection of two sets using AVX-512: need a speed up

I currently need to intersect two sorted vectors.

Here are my implementations:

#include <immintrin.h>
#include <stdint.h>

#include <algorithm>
#include <bit>
#include <chrono>
#include <cstring>
#include <iostream>
#include <string>
#include <vector>

using namespace std;

// Type your code here, or load an example.
void square(vector<uint64_t> &res, vector<uint64_t> &a, vector<uint64_t> &b) {
  set_intersection(a.begin(), a.end(), b.begin(), b.end(), back_inserter(res));
}

void super(vector<uint64_t> &res, vector<uint64_t> &a, vector<uint64_t> &b) {
  uint32_t offset = 0;

  for (auto it = a.begin(), it_end = a.end(); it != it_end; ++it) {
    auto r = _mm512_set1_epi64((uint64_t)*it);

    for (auto it2 = b.begin(), it2_end = b.end(); it2 != it2_end; it2 += 8) {
      auto distance = it2 - b.begin();
      auto *ptr = b.data() + distance;

      auto s = _mm512_loadu_epi64((const void *)ptr);

      auto m = _mm512_cmpeq_epu64_mask(r, s);

      auto count = popcount(m);

      auto *ptr2 = &(*(res.begin() + offset));

      _mm512_mask_compressstoreu_epi64((void *)ptr2, m, s);

      offset += count;
    }
  }

  // res.resize(offset);
}

int main() {
  vector<uint64_t> a, b;

  for (uint32_t x = 0; x < (64 * 1024); ++x) {
    a.push_back(x);
    b.push_back(x + 21);
  }
  vector<uint64_t> res(66000);

  for (uint32_t t = 0; t < 10; ++t) {
    auto c = std::chrono::high_resolution_clock::now();
    super(res, a, b);
    auto c2 = std::chrono::high_resolution_clock::now();
    auto d = chrono::duration_cast<chrono::nanoseconds>(c2 - c);

    cout << d.count() << endl;

    for (uint32_t x = 0, xs = 4; x < xs; ++x) {
      cout << res[x] << endl;
    }

    res.clear();
  }

  cout << "-------" << endl;

  for (uint32_t t = 0; t < 10; ++t) {
    auto c = std::chrono::high_resolution_clock::now();
    square(res, a, b);
    auto c2 = std::chrono::high_resolution_clock::now();
    auto d = chrono::duration_cast<chrono::nanoseconds>(c2 - c);

    cout << d.count() << endl;
    for (uint32_t x = 0, xs = 4; x < xs; ++x) {
      cout << res[x] << endl;
    }
    res.clear();
  }
}

I created a godbolt compiler example to ease the testings by the community: Godbolt's test box

My concern is the abysmal performance of the SIMD version, even using AVX instructions (huge latencies).

The result is correct though.

Could someone have a cue to make it fast?

Edit: I made my vectors a multiple of 8 for this test case, so don't bother with the checks.

Upvotes: 2

Views: 687

Answers (0)

Related Questions