Reputation: 454
I have 3 Observables, the output of the first observable is required by the second observable. And the output the both first and second observable is required by the third observable.
Observable<String> observableOne = Observable
.just("{1}")
.map(v -> {
System.out.println("Executing Observable 1");
return v;
});
Observable<String> observableTwo = observableOne
.map(observableOneValue -> {
System.out.println("Executing Observable 2");
return "{2"+observableOneValue+"}";
});
Observable.zip(
observableOne,
observableTwo,
(observableOneValue, observableTwoValue) ->
{
System.out.println("Executing Observable 3");
return "{3"+observableOneValue+observableTwoValue+"}";
}
).blockingSubscribe(System.out::println);
This repeats the execution of the first observable, I can certainly make the first observable cacheable. But I was wondering if there is a better option than that, particularly I am looking for some kind of message passing construct from the first to the other two observables
Upvotes: 1
Views: 75
Reputation: 32016
I am not sure exactly what you are looking for by a "message passing construct". cache
will work for you example above, but you mention you don't want to use that.
Another option that might fit your use case is using ConnectableObservable
. It only starts emitting items when you call connect
on it, not when it is subscribed to. Convert your observableOne
to a ConnectableObservable
by calling publish
. Then set up all your subscribers. Then call connect
on observableOne
.
ConnectableObservable<String> observableOne = Observable
.just("{1}")
.map(v -> {
System.out.println("Executing Observable 1");
return v;
}).publish();
Observable<String> observableTwo = observableOne
.map(observableOneValue -> {
System.out.println("Executing Observable 2");
return "{2"+observableOneValue+"}";
});
Observable.zip(
observableOne,
observableTwo,
(observableOneValue, observableTwoValue) ->
{
System.out.println("Executing Observable 3");
return "{3"+observableOneValue+observableTwoValue+"}";
}
).subscribe(System.out::println);
// Call when all the subscribers are ready --
observableOne.connect();
Notes
observableOne
is now a ConnectableObservable
subscribe
instead of blockingSubscribe
so the code will execute the connect
call.Upvotes: 1