Reputation: 11926
I'm trying to do the following:
A
|
|
V
Observable<B>
/\
/ \
/ \
V V
Observable<C> Observable<D>
\ /
\ /
V V
Observable<E>
I'm new to RxJava and have come across zip, merge, etc., but don't really understand what operators are required for this kind of problem. Any help will be highly appreciated.
PS.
1) While [C] and [D] are both required, [E] can still be created with only one of them. So, it would be nice to have a timeout at this point in case one (or both) of them fail(s).
2) Is it possible to have them run in specific threads - one in computation() and the other in io()?
Here's a conceptual code that I have at the moment. I do it linearly so:
A -> B -> C -> D -> E
return a2b(a)
.subscribeOn(Schedulers.io())
.flatMap(this::b2c)
.subscribeOn(Schedulers.computation())
.map(this::c2d)
.map(this::d2e)
.cast(E.class)
.startWith(e -> new E.loadingState());
Ideally, I should use the following function somewhere:
Observable<E> cd2e(C c, D d) {
return Observable.just(new E());
}
Thanks.
Upvotes: 4
Views: 2237
Reputation: 8227
The publish()
operator binds a single observable in a way that allows multiple subscriptions.
return a2b(a)
.subscribeOn(Schedulers.io())
.publish( bObservable ->
Observable.zip( bObservable.map( b -> this::b2c ),
bObservable.map( b -> this::b2d ),
(c, d) -> combine( c, d ) )
.subscribe( ... );
The operator binds the observer chain such that multiple subscriptions can be made; in this case the subscriptions are zipped together, combining the C
and D
types into the combined E
type.
You are then free to add observeOn()
operators to have the computation done on the threads you desire.
Upvotes: 5