Reputation: 1238
I'm combining different operations in an Android
app withRxJava
and I want that either the flow finishes successfully and submits items in onNext()
or that after 10seconds, an onError
will be thrown.
I have tried something like this with timeout
:
Observable.from(list)
.doOnNext(new Action1<List<String>>() {
@Override
public void call(List<String> list) {
//do something here
}
})
.filter(new Func1<List<String>>, Boolean>() {
@Override
public Boolean call(List<String> list) {
return list != null;
}
})
.flatMap(new Func1<List<String>>, Observable<MyResponse>>() {
@Override
public Observable<MyResponse> call(List<String> list) {
//flatmap something here
return Observable.just(new MyResponse(list));
}
})
.flatMap(new Func1<MyResponse, Observable<AnotherResponse>>() {
@Override
public Observable<AnotherResponse> call(MyResponse myResponse) {
//do something here
return Observable.just(new AnotherResponse(myResponse));
}
})
.timeout(10, TimeUnit.SECONDS)
.subscribe(new Subscriber<AnotherResponse>()) {
//do Subscription stuff here
});
But this will throw a timeout in any case, I just want to jump to onError
if the flow listed above has not finished successfully in 10seconds. Any suggestions how I could achieve this?
Upvotes: 1
Views: 194
Reputation: 1827
Maybe this could help you.
private Observable<String> myMethod(final List<String> list) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(final Subscriber<? super String> subscriber) {
// Your long lasting operation
Observable<String> listObservable = Observable.from(list);
// This is just my way to make slow operation (I have 10 items in my list)
Observable<String> delayedObservable = listObservable.zipWith(Observable.interval(2, TimeUnit.SECONDS), new Func2<String,
Long,
String>() {
@Override public String call(String s, Long aLong) {
return s;
}
});
delayedObservable.subscribe(subscriber);
Runnable r = new Runnable() {
@Override public void run() {
// This thread makes the timeout, it is up to you if you would keep this like this.
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onError(new TimeoutException());
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
}
});
}
This worked for me, in a very basic situation, I hope it can help you.
Upvotes: 1
Reputation: 12982
As Christopher said in his comment, the reason you are getting the error is that timeout
will throw a TimeoutException
whenever a non-finished Observable (onCompleted
was not yet called) fails to produce the next onNext
within the set timeout.
Since I am not sure what your source Observable - or rather, the Observables inside the flatMap
s - are doing I would first check whether it should actually produce an onCompleted
(probably after at least one onNext
) or whether it stays open "by design" (it may be that the source is an open ended stream like the network state of your device). If the source itself is open-ended by design, you can artificially introduce an onCompleted
after the first onNext
by just adding take(1)
to your chain.
Upvotes: 3