Reputation: 8719
I am looking for a way, hopefully using RxJava for consistency, to monitor the progress of multiple subscribers that may be fired at different times. I am aware of how to merge or flatMap subscribers together when they are all fired from one method but I am unaware of a way to do it when they are fired at different times from different methods.
For example, if I have 2 long running tasks attached to button presses. I push button 1 and fire off the observable/subscriber, half way through running I push button 2 to fire off the second observable/subscriber.
I want to enable a button when no tasks are running and disable it when one or more tasks are running.
Is this possible? I am trying to avoid setting instance variable flags as well.
Upvotes: 3
Views: 1820
Reputation: 513
To make this possible, let both long running operations emit an onNext event on a PublishSubject. Combine both Subjects with a zip or combineLatest function and subscribe to this. Once the combine function receives an event, this means that both Subjects have emitted an onNext event, thus both long running operations have finished and you can enable the 3rd button.
private PublishSubject<Boolean> firstSubject = PublishSubject.create();
private PublishSubject<Boolean> secondSubject = PublishSubject.create();
@Override
public void onStart() {
super.onStart();
subscribeToResult();
}
private Observable<Integer> firstOperation() {
return Observable.just(100)
.delay(1000) // takes a while
.subscribe(tick -> firstSubject.onNext(true));
}
private Observable<Integer> firstOperation() {
return Observable.just(200)
.delay(1000) // takes a while
.subscribe(tick -> secondSubject.onNext(true));
}
private void subscribeToResult() {
Observable.zip(
firstSubject,
secondSubject,
(firstResult, secondResult) -> return true
).subscribe(
tick -> thirdButton.setEnabled(true)
)
}
Definitely take a look at the RxJava combine functions.
Upvotes: 1
Reputation: 4606
I would use a separate BehaviorSubject
and scan
to monitor execution status. This is quite similar to an instance variable, but probably it can inspire you to a better solution. Something like this:
private final BehaviorSubject<Integer> mProgressSubject = BehaviorSubject.create(0);
public Observable<String> firstLongRunningOperations() {
return Observable.just("First")
.doOnSubscribe(() -> mProgressSubject.onNext(1))
.finallyDo(() -> mProgressSubject.onNext(-1)));
}
public Observable<String> secondLongRunningOperations() {
return Observable.just("Second")
.doOnSubscribe(() -> mProgressSubject.onNext(1))
.finallyDo(() -> mProgressSubject.onNext(-1));
}
public Observable<Boolean> isOperationInProgress() {
return mProgressSubject.asObservable()
.scan((sum, item) -> sum + item)
.map(sum -> sum > 0);
}
Usage will be like this:
isOperationInProgress()
.subscribe(inProgress -> {
if (inProgress) {
//disable controls
} else {
//enable controls
}
});
With this approach you can have any number of long running operation and you do not have to fire them all. Just don't forget to call doOnSubscribe
and finallyDo
.
PS. Sorry, I didn't test it, but it should work.
Upvotes: 4