Muhammad Ahmed AbuTalib
Muhammad Ahmed AbuTalib

Reputation: 4302

Interleaving items with Rxjava2

Give two obserable streams {1,3} and {2,4} is there any operator that can produce a result of {1,2,3,4}

I thought merge was for this and went on doing

List<Integer> result=Observable.merge(Observable.fromIterable(Arrays.asList(1,3)),Observable.fromIterable(Arrays.asList(2,4))).toList().blockingGet();

However I later found out that merge "can" interleave but it wont as long as sync operations are performed.

Any pointers?

Upvotes: 0

Views: 46

Answers (3)

Muhammad Ahmed AbuTalib
Muhammad Ahmed AbuTalib

Reputation: 4302

I have one more way to do that . There is no obvious benefit that i can see from the above given solution . However it just uses a normal zip operator to achieve the result.

        List<Integer> intArray1 = Arrays.asList(1,3,5,7);
        List<Integer> intArray2 = Arrays.asList(2,4,6,8);
        Observable.zip(
                Observable.fromIterable(intArray1),
                Observable.fromIterable(intArray2),
                (numbers1, numbers2) -> new Pair<>(numbers1, numbers2)).
                collect(ArrayList<Integer>::new,
                        (list, pair) -> {
                            list.add(pair.first);
                            list.add(pair.second);
                        }).
                subscribe(mergedList -> {
                    Log.d("List", mergedList.size() + "");
                });

Upvotes: 0

akarnokd
akarnokd

Reputation: 70007

There is no standard operator for that. You can try and map items into tuples of (index, value) where index is 0, 2, 4, etc for the first source and 1, 3, 5 for the second. Then use orderedMerge to compare the indices of the tuples, then after the merge, extract the value.

Something like this:

AtomicInteger index1 = new AtomicInteger();
Flowable<Pair<Integer, T>> indexedSource1 = source1
    .map(v -> new Pair<>(index1.getAndAdd(2), v));

AtomicInteger index2 = new AtomicInteger(1);
Flowable<Pair<Integer, T>> indexedSource2 = source2
    .map(v -> new Pair<>(index2.getAndAdd(2), v));

Flowables.orderedMerge(
    (a, b) -> a.first.compareTo(b.first), 
    indexedSource1, 
    indexedSource2
)
.map(v -> v.second);

Upvotes: 2

suenda
suenda

Reputation: 783

If the sources are cold observables and if i need to combine the elements of the sources, I would use zip :

    Observable<Integer> stream1 = Observable.fromIterable(Arrays.asList(1, 3));
    Observable<Integer> stream2 = Observable.fromIterable(Arrays.asList(2, 4));

    List<Integer> integers = Observable.zip(stream1, stream2, (a, b) -> Observable.just(a, b))
            .flatMap(ob -> ob)
            .toList()
            .blockingGet();

Upvotes: -1

Related Questions