Reputation: 3120
I'm working on an Android application that retrieves two different objects via an API that I access via RxJava Observables. To update the UI, I need both results.
How can I run a function as soon as both Observables have completed? It seems like functions like merge
are doing what I'm planning, but as far as I can see they only work for Observables with the same result type or need a composite object that can represent both types.
A simple example:
Observable.just("Hello world")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Observable.just(1, 2, 3, 4, 5, 6)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
}
});
What would I need to do to run System.out.println("Finished!")
as soon as both Observables have completed their task?
In the particular case of my Android application, I could simply store the results in the actual class, have an updateUi function that only does work when all required data already arrived and call this function from both onCompleted calls, but I feel there is a better way.
Upvotes: 2
Views: 1984
Reputation: 15824
Version 2: emitted items are processed as "side" actions, only onCompleted
event goes through to the merged observable.
Observable<String> stringObservable = Observable.just("Hello world")
.doOnNext(System.out::println)
.ignoreElements();
Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6)
.doOnNext(System.out::println)
.ignoreElements();
Observable.merge(stringObservable, integerObservable)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println("Finished!");
}
@Override
public void onError(Throwable throwable) {
/*optionally handle if one of the observables calls onError()*/
}
@Override
public void onNext(Object o) { /*not called*/ }
});
Upvotes: 1
Reputation: 615
I think, you need a TaskCoordinator. Keep a simple counter in the taskCoordinator. Every time you signal a success, counter will be decremented. When the counter reaches zero, the callback will be executed. You can see my implementation like this here https://github.com/codefacts/crm-common/blob/master/src/main/java/io/crm/util/TaskCoordinator.java
It is better because, Every time you coordinate tasks, It will come to use. Reactive Stream does not have built in support for task coordination. So a promise library or Task Coordination implementation will be helpful in this case.
Upvotes: 0