Reputation: 4302
Give two obserable streams {1,3} and {2,4} is there any operator that can produce a result of {1,2,3,4}
I thought merge was for this and went on doing
List<Integer> result=Observable.merge(Observable.fromIterable(Arrays.asList(1,3)),Observable.fromIterable(Arrays.asList(2,4))).toList().blockingGet();
However I later found out that merge "can" interleave but it wont as long as sync operations are performed.
Any pointers?
Upvotes: 0
Views: 46
Reputation: 4302
I have one more way to do that . There is no obvious benefit that i can see from the above given solution . However it just uses a normal zip operator to achieve the result.
List<Integer> intArray1 = Arrays.asList(1,3,5,7);
List<Integer> intArray2 = Arrays.asList(2,4,6,8);
Observable.zip(
Observable.fromIterable(intArray1),
Observable.fromIterable(intArray2),
(numbers1, numbers2) -> new Pair<>(numbers1, numbers2)).
collect(ArrayList<Integer>::new,
(list, pair) -> {
list.add(pair.first);
list.add(pair.second);
}).
subscribe(mergedList -> {
Log.d("List", mergedList.size() + "");
});
Upvotes: 0
Reputation: 70007
There is no standard operator for that. You can try and map items into tuples of (index, value) where index is 0, 2, 4, etc for the first source and 1, 3, 5 for the second. Then use orderedMerge to compare the indices of the tuples, then after the merge, extract the value.
Something like this:
AtomicInteger index1 = new AtomicInteger();
Flowable<Pair<Integer, T>> indexedSource1 = source1
.map(v -> new Pair<>(index1.getAndAdd(2), v));
AtomicInteger index2 = new AtomicInteger(1);
Flowable<Pair<Integer, T>> indexedSource2 = source2
.map(v -> new Pair<>(index2.getAndAdd(2), v));
Flowables.orderedMerge(
(a, b) -> a.first.compareTo(b.first),
indexedSource1,
indexedSource2
)
.map(v -> v.second);
Upvotes: 2
Reputation: 783
If the sources are cold observables and if i need to combine the elements of the sources, I would use zip
:
Observable<Integer> stream1 = Observable.fromIterable(Arrays.asList(1, 3));
Observable<Integer> stream2 = Observable.fromIterable(Arrays.asList(2, 4));
List<Integer> integers = Observable.zip(stream1, stream2, (a, b) -> Observable.just(a, b))
.flatMap(ob -> ob)
.toList()
.blockingGet();
Upvotes: -1