Reputation: 4129
I am trying to implement a method to merge the values in two Stream
s based on a Comparator
for the values.
I had a way to do this, where I iterate over the streams and insert the values into a Stream.Builder
, but I have not been able to figure out how to make a lazy-evaluated version (the way many stream operations are), so it can deal with infinite streams as well.
All I want it to do is perform a single merging pass on the input data, not sort the streams (in fact, it is likely that the streams will be disordered; this disorder needs to be preserved).
static Stream<E> merge(Stream<E> first, Stream<E> second, Comparator<E> c)
How can I lazily merge two streams like this?
If I were doing this with two Queue
s as input and some Consumer
as output, it would be fairly simple:
void merge(Queue<E> first, Queue<E> second, Consumer<E> out, Comparator<E> c){
while(!first.isEmpty() && !second.isEmpty()
if(c.compare(first.peek(), second.peek()) <= 0)
out.accept(first.remove());
else
out.accept(second.remove());
for(E e:first)
out.accept(e);
for(E e:second)
out.accept(e);
}
But I need to do this with lazy evaluation, and streams.
To address the comments, here are some example inputs and the result:
Example 1:
merge(
Stream.of(1, 2, 3, 1, 2, 3),
Stream.of(2, 2, 3, 2, 2, 2),
Comparator.naturalOrder()
);
would return a stream that would produce this sequence:
1, 2, 2, 2, 3, 3, 1, 2, 2, 2, 2, 3
Example 2:
merge(
Stream.iterate(5, i->i-1),
Stream.iterate(1, i->i+1),
Comparator.naturalOrder()
);
would return an infinite (well, an INT_MAX + 5
item) stream that would produce the sequence:
1, 2, 3, 4, 5, 5, 4, 3, 2, 1, 0, -1 ...
As you can see, this is not merely concat(first,second).sort()
, since (a) you can't sort infinite streams, and (b) even when you can sort the streams, it does not give the desired result.
Upvotes: 13
Views: 10453
Reputation: 1255
Iterables.mergeSorted() from Guava
public static <T> Iterable<T> mergeSorted(Iterable<? extends Iterable<? extends T>> iterables,
Comparator<? super T> comparator)
Upvotes: 0
Reputation: 198103
You need to implement a Spliterator
, rather than going through Stream.Builder
. For this, you might even just go through an Iterator
, since it's a fairly sequential operation. Using Guava lightly,
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
Iterators.mergeSorted(
Arrays.asList(stream1.iterator(), stream2.iterator()),
comparator),
Spliterator.ORDERED),
false /* not parallel */ );
Upvotes: 13