Reputation: 1994
I have this scenario: a stream which produces items fairly quickly a subscriber that make a lot of work on the UI Thread.
When a certain condition is meet I would like to unsubscribe to save the extra work from happening.
I would also like to do some extra work in the event of re-subscription.
Example: A network event produce updates to items, a model listen to it and updates the items and expose a new observable with the updated items. a screen subscribes to the model's stream and update the GUI accordingly. a new screen opens on top so we there is not need for the previous screen to update it's GUI anymore -> unsubscribe for it (while the model still being updated) the new screen closes which makes the previous screen visible again, so it needs to resubscribe and because it hasn't listened for a time it needs to refresh it's views.
I have some solutions but I feel there is probably a better one:
I can use filter to check for the condition. and just filter those items, however I prefer a full unsubscribe as it will be more efficient.
getItemsUpdateObs().filter(o -> isScreenVisible()).subscribe(...);
I can use to subscription and listen for the condition event. then subscribe/unsubscribe according to that event with a fresh new subscription each time - this is a procedural solution rather then a functional one. I thought of using the window operator and flatten it, but I'm not sure this is a straightforward solution.
isScreenVisibleObs().subscribe(isVisible -> {
if (isVisible){
subscription = getItemsUpdateObs().subscribe(...)
} else {
if ( subscription != null) subscription.unsubscribe()
}});
Any thoughts?
I would like to do it like this:
getItemsUpdateObs().compose(listenWhen(isScreenVisibleObs()).subscribe(...)
where listenWhen is the question here...
Upvotes: 3
Views: 1881
Reputation: 8227
There is a very useful operator within RxJava called switchMap()
. It will switch subscriptions based on an observable.
First set up your observable based on whether your screen is active:
Observable<Boolean> isActive;
Then, switch based on that observable:
isActive
.switchMap( active -> active ? getItemsUpdateObs() : Observable.never() )
.subscribe( ... );
This will only subscribe to the items update when the screen is active. You will need one observable per screen, but they are cheap. This way, the observable chain maintains its own state, and the correct management and timing of subscribing and re-subscribing are handled for you.
Upvotes: 1
Reputation: 13471
I would not even concern if the previous subsciption finish or not. In the next window where you want to create a new subscriber for the new request, just unsubscribe the subscription and all subscriber will be automatically unsubscribe.
Then create a new subscriber and subscribe again to the subscription.
/**
* You can in any moment unsubscribe all subscriber of a subscription and create a new one again.
* @throws InterruptedException
*/
@Test
public void subscribeAndUnsubscribe() throws InterruptedException {
Integer[] numbers = {0, 1, 2, 4, 5, 6};
Observable<Integer> observable = Observable.from(numbers);
Subscription subscription = observable.subscribeOn(Schedulers.newThread()).subscribe(createSubscriber());
Thread.sleep(2000);
subscription.unsubscribe();
subscription = observable.subscribeOn(Schedulers.newThread()).subscribe(createSubscriber());
Thread.sleep(10000);
System.out.println(subscription.isUnsubscribed());
}
private ActionSubscriber createSubscriber() {
return new ActionSubscriber(number -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("preparing to unsubscribe");
}
System.out.println("Subscriber number:" + number);
},
System.out::println,
() -> System.out.println("Subscriber End of pipeline"));
}
You can see more examples here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java
Upvotes: 0
Reputation: 1005
Save the return type of Observable.subscribe() method. in onActivityResult of the previous screen re-initiate a request again
Subscription subscription = Observable.subscribe(new Subscriber<Type>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String responseString) {
}
});
Whenever you give next request just check with the previous call status(completed/running) as follows. if not completed cancel the previous call and give the new request as follows.
if(subscription!=null && !subscription.isUnsubscribed()){
//Cancel(unSubscribe) the request if running(not completed)
subscription.unsubscribe();
//START A NEW REQUEST HERE
}else{
//already completed so START A NEW REQUEST
}
Upvotes: 1