p4dn24x
p4dn24x

Reputation: 445

How to efficiently merge k sorted pairwise key/value vectors by keys?

I want to merge k sorted pairwise key/value vectors by keys. Typically, the size n of the vectors is very large (e.g., n >= 4,000,000,000).

Consider the following example for k = 2:

// Input
keys_1 = [1, 2, 3, 4], values_1 = [11, 12, 13, 14]
keys_2 = [3, 4, 5, 6], values_2 = [23, 24, 25, 26]

// Output
merged_keys = [1, 2, 3, 3, 4, 4, 5, 6], merged_values = [11, 12, 13, 23, 14, 24, 25, 26]

Since __gnu_parallel::multiway_merge is a highly efficient k-way merge algorithm, I tried to utilize a state-of-the-art zip iterator (https://github.com/dpellegr/ZipIterator) to "combine" the key-value pair vectors.

#include <iostream>
#include <vector>
#include <parallel/algorithm>

#include "ZipIterator.hpp"

int main(int argc, char* argv[]) {
  std::vector<int> keys_1   = {1, 2, 3, 4};
  std::vector<int> values_1 = {11, 12, 13, 14};
  std::vector<int> keys_2   = {3, 4, 5, 6};
  std::vector<int> values_2 = {23, 24, 25, 26};

  std::vector<int> merged_keys(8);
  std::vector<int> merged_values(8);

  auto kv_it_1 = Zip(keys_1, values_1);
  auto kv_it_2 = Zip(keys_2, values_2);
  auto mkv_it = Zip(merged_keys, merged_values);

  auto it_pairs = {std::make_pair(kv_it_1.begin(), kv_it_1.end()),
                   std::make_pair(kv_it_2.begin(), kv_it_2.end())};

  __gnu_parallel::multiway_merge(it_pairs.begin(), it_pairs.end(), mkv_it.begin(), 8, std::less<>());
  
  for (size_t i = 0; i < 8; ++i) {
    std::cout << merged_keys[i] << ":" << merged_values[i] << (i == 7 ? "\n" : ", ");
  }

  return 0;
}

However, I get various compilation errors (building with -O3):

error: cannot bind non-const lvalue reference of type' std::__iterator_traits<ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > > >, void>::value_type&' {aka 'std::tuple<int, int>&'} to an rvalue of type' std::tuple<int, int>'

error: cannot convert ‘ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > > >::reference*’ {aka ‘ZipRef<int, int>’} to ‘_ValueType’ {aka ‘std::tuple<int, int>*’}

Is it possible to modify the ZipIterator to make it work?

Or is there a more efficient way of merging k sorted pairwise key/value vectors by keys?

Considered Alternatives

  1. Define a KeyValuePair struct with int key and int value members as well as operator< and operator<= operators. Move the elements of the key/value vectors into std::vector<KeyValuePair>s. Call __gnu_parallel::multiway_merge on the std::vector<KeyValuePair>s. Move the merged elements back into the key/value vectors. [Verdict: slow execution, high memory overhead, even with -O3]
  2. Use std::merge(std::execution::par_unseq, kv_it_1.begin(), kv_it_1.end(), kv_it_2.begin(), kv_it_2.end(), mkv_it.begin()); instead of __gnu_parallel::multiway_merge. [Verdict: supports only two key/value vectors]

Upvotes: 2

Views: 449

Answers (4)

Mikhail
Mikhail

Reputation: 21749

Is it possible to modify the ZipIterator to make it work?

Yes, but it would require patching __gnu_parallel::multiway_merge. The source of error is this line:

      /** @brief Dereference operator.
      *  @return Referenced element. */
      typename std::iterator_traits<_RAIter>::value_type&
      operator*() const
      { return *_M_current; }

This is a member function of _GuardedIterator - an auxiliary structure used in the multiway_merge implementation. It wraps _RAIter class which in your case is ZipIter. By definition, when an iterator is dereferenced (*_M_current), the type of the returned expression is supposed to be reference type. However, this code expects it to be value_type&. In most cases, these are the same types. Indeed, when you dereference an item you expect to get a reference to this very item. However, it is impossible to do with a zip iterator, because its elements are virtual, they are created on the fly. That's why reference type of ZipIter is not a reference type at all, it is actually a value type called ZipRef:

  using reference = ZipRef<std::remove_reference_t<typename std::iterator_traits<IT>::reference>...>;

Kind of the same practice that is used with (much hated) vector<bool>.

So, there is no problem with ZipIterator, or with how you use the algorithm, it is a non-trivial requirement for the algorithm itself. The next question is, can we get rid of it?

And the answer is yes. You can change _GuardedIterator::operator*() to return reference instead of value_type&. Then you will have a compile error in this line:

      // Default value for potentially non-default-constructible types.
      _ValueType* __arbitrary_element = 0;

      for (_SeqNumber __t = 0; __t < __k; ++__t)
        {
          if(!__arbitrary_element
             && _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0)
            __arbitrary_element = &(*__seqs_begin[__t].first);
        }

Here the address of an element is taken for some __arbitrary_element. We can store a copy of this element instead since we know ZipRef is cheap to copy and it is default-constructible:

      // Local copy of the element
      _ValueType __arbitrary_element_val;
      _ValueType* __arbitrary_element = 0;

      for (_SeqNumber __t = 0; __t < __k; ++__t)
        {
          if(!__arbitrary_element
             && _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0) {
            __arbitrary_element_val = *__seqs_begin[__t].first;
            __arbitrary_element = &__arbitrary_element_val;
          }
        }

The same errors will appear in several places in the file multiseq_selection.h, e.g. here and here. Fix all of them using the similar technique.

Then you will see multiple errors like this one:

./parallel/multiway_merge.h:879:29: error: passing ‘const ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > > >’ as ‘this’ argument discards qualifiers [-fpermissive]

They are about const incorrectness. They are due to the fact that you declared it_pairs as auto, which in this particular scenario deduced the type to be std::inializer_list. This is a very peculiar type. For instance, it provides only constant access to its members, even though it itself is not declared const. That's the source of these errors. Change auto to e.g. std::vector and these errors are gone.

It should compile find at this point. Just don't forget to compile with -fopenmp or you will get "undefined reference to `omp_get_thread_num'" error.

Here is the output that I see:

$ ./a.out
1:11, 2:12, 3:13, 3:23, 4:14, 4:24, 5:25, 6:26

Upvotes: 5

sakra
sakra

Reputation: 65771

Since you need low memory overhead, one possible solution is to have the multiway_merge algorithm only operate on unique range identifiers and range indices and to supply the comparison and copy operators as lambda functions. That way the merge algorithm is completely independent of the actual container types and key and value types used.

Here is a C++17 solution which is based on the heap based algorithm described here:

#include <cassert>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <iterator>
#include <queue>
#include <vector>

using range_type = std::pair<std::uint32_t,std::size_t>;

void multiway_merge(
    std::initializer_list<std::size_t> range_sizes,
    std::function<bool(const range_type&, const range_type&)> compare_func,
    std::function<void(const range_type&)> copy_func)
{
    // lambda compare function for priority queue of ranges
    auto queue_less = [&](const range_type& range1, const range_type& range2) {
        // reverse comparison order of range1 and range2 here,
        // because we require the smallest element to be on top
        return compare_func(range2, range1);
    };
    // create priority queue from all non-empty ranges
    std::priority_queue<
        range_type, std::vector<range_type>, 
        decltype(queue_less)> queue{ queue_less };
    for (std::uint32_t range_id = 0; range_id < range_sizes.size(); ++range_id) {
        if (std::data(range_sizes)[range_id] > 0) {
            queue.emplace(range_id, 0);
        }
    }
    // merge ranges until priority queue is empty
    while (!queue.empty()) {
        range_type top_range = queue.top();
        queue.pop();
        copy_func(top_range);
        if (++top_range.second != std::data(range_sizes)[top_range.first]) {
            // re-insert non-empty range
            queue.push(top_range);
        }
    }
}


int main() {
    std::vector<int> keys_1   = { 1, 2, 3, 4 };
    std::vector<int> values_1 = { 11, 12, 13, 14 };
    std::vector<int> keys_2   = { 3, 4, 5, 6, 7 };
    std::vector<int> values_2 = { 23, 24, 25, 26, 27 };

    std::vector<int> merged_keys;
    std::vector<int> merged_values;

    multiway_merge(
        { keys_1.size(), keys_2.size() },
        [&](const range_type& left, const range_type& right) {
            if (left == right) return false;
            switch (left.first) {
                case 0:
                    assert(right.first == 1);
                    return keys_1[left.second] < keys_2[right.second];
                case 1:
                    assert(right.first == 0);
                    return keys_2[left.second] < keys_1[right.second];
            }
            return false;
        },
        [&](const range_type& range) {
            switch (range.first) {
                case 0:
                    merged_keys.push_back(keys_1[range.second]);
                    merged_values.push_back(values_1[range.second]);
                    break;
                case 1:
                    merged_keys.push_back(keys_2[range.second]);
                    merged_values.push_back(values_2[range.second]);
                    break;
            }
        });
    // copy result to stdout
    std::cout << "keys: ";
    std::copy(
        merged_keys.cbegin(), merged_keys.cend(), 
        std::ostream_iterator<int>(std::cout, " "));
    std::cout << "\nvalues: ";
    std::copy(
        merged_values.cbegin(), merged_values.cend(), 
        std::ostream_iterator<int>(std::cout, " "));
    std::cout << "\n";
}

The algorithm has a time complexity of O(n log(k)) and a space complexity of O(k), where n is the total size of all ranges and k is the number of ranges.

The sizes of all input ranges need to be passed as an initializer list. The example only passes the two input ranges from your example. Extending the example for more than two ranges is straightforward.

Upvotes: 1

Alexsen
Alexsen

Reputation: 176

I barely remember this, but you might find it helpful - I'm pretty sure I have seen merging K sorted linked lists problem. It was using something similar to Divide and Conquer and was close to logarithmic time complexity. I doubt it's any possible to get a better time complexity.

The logic behind this was to minimize iterations over merged lists. If you merge 1st and 2nd lists, then merging it with 3rd involves going through the longer, merged list. This method avoided this by merging all little lists at first, then moving to(what I like to call) '2nd layer merging' by merging 1-time merged lists.

This way, if your lists' length on average is n, you have to do at most logn iterators, resulting in K*log(n) complexity, where K is amount of the lists you have.

Sorry for being a little 'not-so-precise', but I think you might find this piece of information helpful. Although, I'm not familiar with multiway_merge by gnu, so whatever I said might be quite useless too.

Upvotes: 0

SrPanda
SrPanda

Reputation: 1303

You will have to implement one that fits that exact case you have and with such a large arrays multi threatening may not be that good if you can afford to allocate a full or close to full copy of the arrays, one optimization you can do is to use large pages and ensure that the memory you are accessing is not paged (it is not ideal to not have swap if you plan to run at capacity).

This simple low memory example works just fine, it is hard to beat sequential i/o, the main bottle neck it has is the use of realloc, when displacing the used values from the arrs to the ret multiple reallocs at every step_size are made but only one is expensive, ret.reserve() can consume a "large" amount of time just because shortening a buffer is always available but extending one might not and multiple memory movements might be need to be made by the os.

#include <vector>
#include <chrono>
#include <stdio.h>

template<typename Pair, typename bool REVERSED = true>
std::vector<Pair> multi_merge_lm(std::vector<std::vector<Pair>>& arrs, float step){
    size_t final_size = 0, max, i;
    for (i = 0; i < arrs.size(); i++){
        final_size += arrs[i].size();
    }

    float original = (float)final_size;
    size_t step_size = (size_t)((float)(final_size) * step);

    printf("Merge of %zi (%zi bytes) with %zi step size \n", 
        final_size, sizeof(Pair), step_size
    );
    printf("Merge operation size %.*f mb + %.*f mb \n",
        3, ((float)(sizeof(Pair) * (float)final_size) / 1000000),
        3, ((float)(sizeof(Pair) * (float)final_size * step) / 1000000)
    );

    std::vector<Pair> ret;
    while (final_size --> 0){

        for (max = 0, i = 0; i < arrs.size(); i++){
            // select the next biggest item from all the arrays
            if (arrs[i].back().first > arrs[max].back().first){
                max = i;
            }
        }

        // This does not actualy resize the vector 
        // unless the capacity is too small
        ret.push_back(arrs[max].back());
        arrs[max].pop_back();

        // This check could be extracted of the while
        // with a unroll and sort to little
        for (i = 0; i < arrs.size(); i++){
            if (arrs[i].empty()){
                arrs[i] = arrs.back();
                arrs.pop_back();
                break;
            }
        }

        if (ret.size() == ret.capacity()) {
            // Remove the used memory from the arrs and
            // realloc more to the ret
            for (std::vector<Pair>& chunk : arrs){
                chunk.shrink_to_fit();
            }
            ret.reserve(ret.size() + step_size);

            // Dont move this to the while loop, it will slow down
            // the execution, leave it just for debugging
            printf("\rProgress %i%c / Merge size %zi", 
                (int)((1 - ((float)final_size / original) ) * 100), 
                '%', ret.size()
            );
        }
    }

    printf("\r%*c\r", 40, ' ');
    ret.shrink_to_fit();
    arrs.clear();

    if (REVERSED){
        std::reverse(ret.begin(), ret.end());
    }

    return ret;
}

int main(void) {

    typedef std::pair<uint64_t, uint64_t> Pair;

    int inc = 1;
    int increment = 100000;
    int test_size = 40000000;
    float step_size = 0.05f;

    auto arrs = std::vector<std::vector<Pair>>(5);
    for (auto& chunk : arrs){

        // makes the arrays big and asymmetric and adds 
        // some data to check if it works
        chunk.resize(test_size + increment * inc++);
        for (int i = 0; i < chunk.size(); i++){
            chunk[i] = std::make_pair(i, i * -1);
        }

    }
    printf("Generation done \n");

    auto start = std::chrono::steady_clock::now();
    auto merged = multi_merge_lm<Pair>(arrs, step_size);
    auto end = std::chrono::steady_clock::now();

    printf("Time taken: %lfs \n", 
        (std::chrono::duration<double>(end - start)).count()
    );
    for (size_t i = 1; i < merged.size(); i++){
        if (merged[i - 1] > merged[i]){
            printf("Miss placed at index: %zi \n", i - 1);
        }
    }

    merged.clear();
    return 0;
}
Merge of 201500000 (16 bytes) with 10075000 step size
Merge operation size 3224.000 mb + 161.200 mb
Time taken: 166.197639s

Running this thru a profiler (ANDuProf in my case) shows that the resizing is quite expensive, the larger you make the step_size the more efficient it becomes.

ANDuProf run of the code (Names are duplicated because they are from different parts of the code that call the same functions, and in this case, calls that the std functions make)

This rerun is with 0.5x, it is ~2x faster but now the function consumes 10x more memory than before and you should keep in mind that this values are not generic, they might change depending on what hardware you are running but the proportion are not going to change that much.

Merge of 201500000 (16 bytes) with 100750000 step size
Merge operation size 3224.000 mb + 1612.000 mb
Time taken: 72.062857s

Two other thing you shouldn't forget is that std::vector is dynamic and it's actual size might be bigger and O2 cant really do much optimization to the heap memory access, if you cant make it secuencial then the instruction can only wait.

Upvotes: 0

Related Questions