Reputation: 13031
I have the below existing sample code
val disposable = Observable.interval(interval, TimeUnit.SECONDS)
.take((maxTimeForRequestInSecond / interval).toInt())
.subscribe {
Manager.fetchAvailableSubscriptions(context, params) { status: WebService.Status, response: String? ->
if (status == WebService.Status.Success) {
// TODO stop the timer and dismiss observable!
}
}
}
How can i dismiss the Observable when a condition is met from the subscribe like i have on the TODO
? (i have no access to the disposable
)
Note: Using RxJava 1.1.6
Upvotes: 1
Views: 70
Reputation: 3253
Your stream is somewhat broken, please express fetchAvailableSubscriptions
as Observable<ResponseWithStatus>
. That way you don't have callback in callback and it will easier to implement using operator takeUntil
You can do that by wrapping Manager.fetchAvailableSubscriptions
into Observable.fromCallable
or Observable.create
Then your stream will look more or less like
Disposable disposable = Observable.interval(interval, TimeUnit.SECONDS)
.take((int)(maxTimeForRequestInSecond / interval))
.flatMap(__ -> fetchAvailableSubscriptionsObservable())
.takeUntil(responseWithStatus -> status == WebService.Status.Success)
.subscribe(responseWithStatus -> {
// use the response
});
Upvotes: 2