Reputation: 445
I want to merge k
sorted pairwise key/value vectors by keys. Typically, the size n
of the vectors is very large (e.g., n >= 4,000,000,000
).
Consider the following example for k = 2
:
// Input
keys_1 = [1, 2, 3, 4], values_1 = [11, 12, 13, 14]
keys_2 = [3, 4, 5, 6], values_2 = [23, 24, 25, 26]
// Output
merged_keys = [1, 2, 3, 3, 4, 4, 5, 6], merged_values = [11, 12, 13, 23, 14, 24, 25, 26]
Since __gnu_parallel::multiway_merge
is a highly efficient k
-way merge algorithm, I tried to utilize a state-of-the-art zip iterator (https://github.com/dpellegr/ZipIterator) to "combine" the key-value pair vectors.
#include <iostream>
#include <vector>
#include <parallel/algorithm>
#include "ZipIterator.hpp"
int main(int argc, char* argv[]) {
std::vector<int> keys_1 = {1, 2, 3, 4};
std::vector<int> values_1 = {11, 12, 13, 14};
std::vector<int> keys_2 = {3, 4, 5, 6};
std::vector<int> values_2 = {23, 24, 25, 26};
std::vector<int> merged_keys(8);
std::vector<int> merged_values(8);
auto kv_it_1 = Zip(keys_1, values_1);
auto kv_it_2 = Zip(keys_2, values_2);
auto mkv_it = Zip(merged_keys, merged_values);
auto it_pairs = {std::make_pair(kv_it_1.begin(), kv_it_1.end()),
std::make_pair(kv_it_2.begin(), kv_it_2.end())};
__gnu_parallel::multiway_merge(it_pairs.begin(), it_pairs.end(), mkv_it.begin(), 8, std::less<>());
for (size_t i = 0; i < 8; ++i) {
std::cout << merged_keys[i] << ":" << merged_values[i] << (i == 7 ? "\n" : ", ");
}
return 0;
}
However, I get various compilation errors (building with -O3
):
error: cannot bind non-const lvalue reference of type' std::__iterator_traits<ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > > >, void>::value_type&' {aka 'std::tuple<int, int>&'} to an rvalue of type' std::tuple<int, int>'
error: cannot convert ‘ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator > > >::reference*’ {aka ‘ZipRef<int, int>’} to ‘_ValueType’ {aka ‘std::tuple<int, int>*’}
Is it possible to modify the ZipIterator
to make it work?
Or is there a more efficient way of merging k
sorted pairwise key/value vectors by keys?
Considered Alternatives
KeyValuePair
struct
with int key
and int value
members as well as operator<
and operator<=
operators. Move the elements of the key/value vectors into std::vector<KeyValuePair>
s. Call __gnu_parallel::multiway_merge
on the std::vector<KeyValuePair>
s. Move the merged elements back into the key/value vectors.
[Verdict: slow execution, high memory overhead, even with -O3
]std::merge(std::execution::par_unseq, kv_it_1.begin(), kv_it_1.end(), kv_it_2.begin(), kv_it_2.end(), mkv_it.begin());
instead of __gnu_parallel::multiway_merge
.
[Verdict: supports only two key/value vectors]Upvotes: 2
Views: 449
Reputation: 21749
Is it possible to modify the ZipIterator to make it work?
Yes, but it would require patching __gnu_parallel::multiway_merge
. The source of error is this line:
/** @brief Dereference operator.
* @return Referenced element. */
typename std::iterator_traits<_RAIter>::value_type&
operator*() const
{ return *_M_current; }
This is a member function of _GuardedIterator
- an auxiliary structure used in the multiway_merge
implementation. It wraps _RAIter
class which in your case is ZipIter
. By definition, when an iterator is dereferenced (*_M_current
), the type of the returned expression is supposed to be reference
type. However, this code expects it to be value_type&
. In most cases, these are the same types. Indeed, when you dereference an item you expect to get a reference to this very item. However, it is impossible to do with a zip iterator, because its elements are virtual, they are created on the fly. That's why reference
type of ZipIter
is not a reference type at all, it is actually a value type called ZipRef
:
using reference = ZipRef<std::remove_reference_t<typename std::iterator_traits<IT>::reference>...>;
Kind of the same practice that is used with (much hated) vector<bool>
.
So, there is no problem with ZipIterator
, or with how you use the algorithm, it is a non-trivial requirement for the algorithm itself. The next question is, can we get rid of it?
And the answer is yes. You can change _GuardedIterator::operator*()
to return reference
instead of value_type&
. Then you will have a compile error in this line:
// Default value for potentially non-default-constructible types.
_ValueType* __arbitrary_element = 0;
for (_SeqNumber __t = 0; __t < __k; ++__t)
{
if(!__arbitrary_element
&& _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0)
__arbitrary_element = &(*__seqs_begin[__t].first);
}
Here the address of an element is taken for some __arbitrary_element
. We can store a copy of this element instead since we know ZipRef
is cheap to copy and it is default-constructible:
// Local copy of the element
_ValueType __arbitrary_element_val;
_ValueType* __arbitrary_element = 0;
for (_SeqNumber __t = 0; __t < __k; ++__t)
{
if(!__arbitrary_element
&& _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0) {
__arbitrary_element_val = *__seqs_begin[__t].first;
__arbitrary_element = &__arbitrary_element_val;
}
}
The same errors will appear in several places in the file multiseq_selection.h
, e.g. here and here. Fix all of them using the similar technique.
Then you will see multiple errors like this one:
./parallel/multiway_merge.h:879:29: error: passing ‘const ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > > >’ as ‘this’ argument discards qualifiers [-fpermissive]
They are about const incorrectness. They are due to the fact that you declared it_pairs
as auto
, which in this particular scenario deduced the type to be std::inializer_list
. This is a very peculiar type. For instance, it provides only constant access to its members, even though it itself is not declared const. That's the source of these errors. Change auto
to e.g. std::vector
and these errors are gone.
It should compile find at this point. Just don't forget to compile with -fopenmp
or you will get "undefined reference to `omp_get_thread_num'" error.
Here is the output that I see:
$ ./a.out
1:11, 2:12, 3:13, 3:23, 4:14, 4:24, 5:25, 6:26
Upvotes: 5
Reputation: 65771
Since you need low memory overhead, one possible solution is to have the multiway_merge
algorithm only operate on unique range identifiers and range indices and to supply the comparison and copy operators as lambda functions.
That way the merge algorithm is completely independent of the actual container types and key and value types used.
Here is a C++17 solution which is based on the heap based algorithm described here:
#include <cassert>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <iterator>
#include <queue>
#include <vector>
using range_type = std::pair<std::uint32_t,std::size_t>;
void multiway_merge(
std::initializer_list<std::size_t> range_sizes,
std::function<bool(const range_type&, const range_type&)> compare_func,
std::function<void(const range_type&)> copy_func)
{
// lambda compare function for priority queue of ranges
auto queue_less = [&](const range_type& range1, const range_type& range2) {
// reverse comparison order of range1 and range2 here,
// because we require the smallest element to be on top
return compare_func(range2, range1);
};
// create priority queue from all non-empty ranges
std::priority_queue<
range_type, std::vector<range_type>,
decltype(queue_less)> queue{ queue_less };
for (std::uint32_t range_id = 0; range_id < range_sizes.size(); ++range_id) {
if (std::data(range_sizes)[range_id] > 0) {
queue.emplace(range_id, 0);
}
}
// merge ranges until priority queue is empty
while (!queue.empty()) {
range_type top_range = queue.top();
queue.pop();
copy_func(top_range);
if (++top_range.second != std::data(range_sizes)[top_range.first]) {
// re-insert non-empty range
queue.push(top_range);
}
}
}
int main() {
std::vector<int> keys_1 = { 1, 2, 3, 4 };
std::vector<int> values_1 = { 11, 12, 13, 14 };
std::vector<int> keys_2 = { 3, 4, 5, 6, 7 };
std::vector<int> values_2 = { 23, 24, 25, 26, 27 };
std::vector<int> merged_keys;
std::vector<int> merged_values;
multiway_merge(
{ keys_1.size(), keys_2.size() },
[&](const range_type& left, const range_type& right) {
if (left == right) return false;
switch (left.first) {
case 0:
assert(right.first == 1);
return keys_1[left.second] < keys_2[right.second];
case 1:
assert(right.first == 0);
return keys_2[left.second] < keys_1[right.second];
}
return false;
},
[&](const range_type& range) {
switch (range.first) {
case 0:
merged_keys.push_back(keys_1[range.second]);
merged_values.push_back(values_1[range.second]);
break;
case 1:
merged_keys.push_back(keys_2[range.second]);
merged_values.push_back(values_2[range.second]);
break;
}
});
// copy result to stdout
std::cout << "keys: ";
std::copy(
merged_keys.cbegin(), merged_keys.cend(),
std::ostream_iterator<int>(std::cout, " "));
std::cout << "\nvalues: ";
std::copy(
merged_values.cbegin(), merged_values.cend(),
std::ostream_iterator<int>(std::cout, " "));
std::cout << "\n";
}
The algorithm has a time complexity of O(n log(k)) and a space complexity of O(k), where n is the total size of all ranges and k is the number of ranges.
The sizes of all input ranges need to be passed as an initializer list. The example only passes the two input ranges from your example. Extending the example for more than two ranges is straightforward.
Upvotes: 1
Reputation: 176
I barely remember this, but you might find it helpful - I'm pretty sure I have seen merging K sorted linked lists problem. It was using something similar to Divide and Conquer and was close to logarithmic time complexity. I doubt it's any possible to get a better time complexity.
The logic behind this was to minimize iterations over merged lists. If you merge 1st and 2nd lists, then merging it with 3rd involves going through the longer, merged list. This method avoided this by merging all little lists at first, then moving to(what I like to call) '2nd layer merging' by merging 1-time merged lists.
This way, if your lists' length on average is n, you have to do at most logn iterators, resulting in K*log(n) complexity, where K is amount of the lists you have.
Sorry for being a little 'not-so-precise', but I think you might find this piece of information helpful. Although, I'm not familiar with multiway_merge
by gnu, so whatever I said might be quite useless too.
Upvotes: 0
Reputation: 1303
You will have to implement one that fits that exact case you have and with such a large arrays multi threatening may not be that good if you can afford to allocate a full or close to full copy of the arrays, one optimization you can do is to use large pages and ensure that the memory you are accessing is not paged (it is not ideal to not have swap if you plan to run at capacity).
This simple low memory example works just fine, it is hard to beat sequential i/o, the main bottle neck it has is the use of realloc
, when displacing the used values from the arrs
to the ret
multiple reallocs
at every step_size
are made but only one is expensive, ret.reserve()
can consume a "large" amount of time just because shortening a buffer is always available but extending one might not and multiple memory movements might be need to be made by the os.
#include <vector>
#include <chrono>
#include <stdio.h>
template<typename Pair, typename bool REVERSED = true>
std::vector<Pair> multi_merge_lm(std::vector<std::vector<Pair>>& arrs, float step){
size_t final_size = 0, max, i;
for (i = 0; i < arrs.size(); i++){
final_size += arrs[i].size();
}
float original = (float)final_size;
size_t step_size = (size_t)((float)(final_size) * step);
printf("Merge of %zi (%zi bytes) with %zi step size \n",
final_size, sizeof(Pair), step_size
);
printf("Merge operation size %.*f mb + %.*f mb \n",
3, ((float)(sizeof(Pair) * (float)final_size) / 1000000),
3, ((float)(sizeof(Pair) * (float)final_size * step) / 1000000)
);
std::vector<Pair> ret;
while (final_size --> 0){
for (max = 0, i = 0; i < arrs.size(); i++){
// select the next biggest item from all the arrays
if (arrs[i].back().first > arrs[max].back().first){
max = i;
}
}
// This does not actualy resize the vector
// unless the capacity is too small
ret.push_back(arrs[max].back());
arrs[max].pop_back();
// This check could be extracted of the while
// with a unroll and sort to little
for (i = 0; i < arrs.size(); i++){
if (arrs[i].empty()){
arrs[i] = arrs.back();
arrs.pop_back();
break;
}
}
if (ret.size() == ret.capacity()) {
// Remove the used memory from the arrs and
// realloc more to the ret
for (std::vector<Pair>& chunk : arrs){
chunk.shrink_to_fit();
}
ret.reserve(ret.size() + step_size);
// Dont move this to the while loop, it will slow down
// the execution, leave it just for debugging
printf("\rProgress %i%c / Merge size %zi",
(int)((1 - ((float)final_size / original) ) * 100),
'%', ret.size()
);
}
}
printf("\r%*c\r", 40, ' ');
ret.shrink_to_fit();
arrs.clear();
if (REVERSED){
std::reverse(ret.begin(), ret.end());
}
return ret;
}
int main(void) {
typedef std::pair<uint64_t, uint64_t> Pair;
int inc = 1;
int increment = 100000;
int test_size = 40000000;
float step_size = 0.05f;
auto arrs = std::vector<std::vector<Pair>>(5);
for (auto& chunk : arrs){
// makes the arrays big and asymmetric and adds
// some data to check if it works
chunk.resize(test_size + increment * inc++);
for (int i = 0; i < chunk.size(); i++){
chunk[i] = std::make_pair(i, i * -1);
}
}
printf("Generation done \n");
auto start = std::chrono::steady_clock::now();
auto merged = multi_merge_lm<Pair>(arrs, step_size);
auto end = std::chrono::steady_clock::now();
printf("Time taken: %lfs \n",
(std::chrono::duration<double>(end - start)).count()
);
for (size_t i = 1; i < merged.size(); i++){
if (merged[i - 1] > merged[i]){
printf("Miss placed at index: %zi \n", i - 1);
}
}
merged.clear();
return 0;
}
Merge of 201500000 (16 bytes) with 10075000 step size
Merge operation size 3224.000 mb + 161.200 mb
Time taken: 166.197639s
Running this thru a profiler (ANDuProf in my case) shows that the resizing is quite expensive, the larger you make the step_size
the more efficient it becomes.
(Names are duplicated because they are from different parts of the code that call the same functions, and in this case, calls that the std functions make)
This rerun is with 0.5x, it is ~2x faster but now the function consumes 10x more memory than before and you should keep in mind that this values are not generic, they might change depending on what hardware you are running but the proportion are not going to change that much.
Merge of 201500000 (16 bytes) with 100750000 step size
Merge operation size 3224.000 mb + 1612.000 mb
Time taken: 72.062857s
Two other thing you shouldn't forget is that std::vector
is dynamic and it's actual size might be bigger and O2
cant really do much optimization to the heap memory access, if you cant make it secuencial then the instruction can only wait.
Upvotes: 0