TareK Khoury
TareK Khoury

Reputation: 13031

RXJava unsubscribe from the subscribe call

I have the below existing sample code

val disposable = Observable.interval(interval, TimeUnit.SECONDS)
                .take((maxTimeForRequestInSecond / interval).toInt())
                .subscribe {
                    Manager.fetchAvailableSubscriptions(context, params) { status: WebService.Status, response: String? ->
                        if (status == WebService.Status.Success) {
                            // TODO stop the timer and dismiss observable!
                        }
                    }
                }

How can i dismiss the Observable when a condition is met from the subscribe like i have on the TODO? (i have no access to the disposable)

Note: Using RxJava 1.1.6

Upvotes: 1

Views: 70

Answers (1)

Tuby
Tuby

Reputation: 3253

Your stream is somewhat broken, please express fetchAvailableSubscriptions as Observable<ResponseWithStatus>. That way you don't have callback in callback and it will easier to implement using operator takeUntil

You can do that by wrapping Manager.fetchAvailableSubscriptions into Observable.fromCallable or Observable.create

Then your stream will look more or less like

Disposable disposable = Observable.interval(interval, TimeUnit.SECONDS)
                .take((int)(maxTimeForRequestInSecond / interval))
                .flatMap(__ -> fetchAvailableSubscriptionsObservable())
                .takeUntil(responseWithStatus -> status == WebService.Status.Success)
                .subscribe(responseWithStatus -> {
                         // use the response 
                 });

Upvotes: 2

Related Questions