Rajath
Rajath

Reputation: 11926

Using RxJava to fork into tasks and combine results

I'm trying to do the following:

                           A
                           |
                           |
                           V
                     Observable<B>
                           /\
                          /  \
                         /    \
                        V      V
            Observable<C>       Observable<D>
                        \      /
                         \    /
                          V  V
                      Observable<E>
  1. Given an input [A], an async call returns [B].
  2. Two tasks that each need [B] need to run in parallel and return [C] and [D] respectively.
  3. The two results are combined into [E], which is then shown in the UI.

I'm new to RxJava and have come across zip, merge, etc., but don't really understand what operators are required for this kind of problem. Any help will be highly appreciated.

PS. 1) While [C] and [D] are both required, [E] can still be created with only one of them. So, it would be nice to have a timeout at this point in case one (or both) of them fail(s).
2) Is it possible to have them run in specific threads - one in computation() and the other in io()?

Here's a conceptual code that I have at the moment. I do it linearly so:
A -> B -> C -> D -> E

    return a2b(a)
            .subscribeOn(Schedulers.io())
            .flatMap(this::b2c)
            .subscribeOn(Schedulers.computation())
            .map(this::c2d)
            .map(this::d2e)
            .cast(E.class)
            .startWith(e -> new E.loadingState());

Ideally, I should use the following function somewhere:

Observable<E> cd2e(C c, D d) {
    return Observable.just(new E());
}

Thanks.

Upvotes: 4

Views: 2237

Answers (1)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

The publish() operator binds a single observable in a way that allows multiple subscriptions.

return a2b(a)
        .subscribeOn(Schedulers.io())
        .publish( bObservable -> 
               Observable.zip( bObservable.map( b -> this::b2c ),
                               bObservable.map( b -> this::b2d ),
                               (c, d) -> combine( c, d ) )
        .subscribe( ... );

The operator binds the observer chain such that multiple subscriptions can be made; in this case the subscriptions are zipped together, combining the C and D types into the combined E type.

You are then free to add observeOn() operators to have the computation done on the threads you desire.

Upvotes: 5

Related Questions