Eve
Eve

Reputation: 1238

RxJava send onError() after 10secs

I'm combining different operations in an Android app withRxJava and I want that either the flow finishes successfully and submits items in onNext() or that after 10seconds, an onError will be thrown.

I have tried something like this with timeout:

Observable.from(list)
            .doOnNext(new Action1<List<String>>() {
                @Override
                public void call(List<String> list) {
                    //do something here
                }
            })
            .filter(new Func1<List<String>>, Boolean>() {
                @Override
                public Boolean call(List<String> list) {
                    return list != null;
                }
            })
            .flatMap(new Func1<List<String>>, Observable<MyResponse>>() {
                @Override
                public Observable<MyResponse> call(List<String> list) {
                    //flatmap something here
                    return Observable.just(new MyResponse(list));
                }

            })
            .flatMap(new Func1<MyResponse, Observable<AnotherResponse>>() {
                @Override
                public Observable<AnotherResponse> call(MyResponse myResponse) {
                    //do something here
                    return Observable.just(new AnotherResponse(myResponse));
                }
            })
            .timeout(10, TimeUnit.SECONDS)
            .subscribe(new Subscriber<AnotherResponse>()) {
                //do Subscription stuff here
            });

But this will throw a timeout in any case, I just want to jump to onError if the flow listed above has not finished successfully in 10seconds. Any suggestions how I could achieve this?

Upvotes: 1

Views: 194

Answers (2)

stephanmantel
stephanmantel

Reputation: 1827

Maybe this could help you.

private Observable<String> myMethod(final List<String> list) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override public void call(final Subscriber<? super String> subscriber) {

            // Your long lasting operation
            Observable<String> listObservable = Observable.from(list);

            // This is just my way to make slow operation (I have 10 items in my list)
            Observable<String> delayedObservable = listObservable.zipWith(Observable.interval(2, TimeUnit.SECONDS), new Func2<String,
                    Long,
                    String>() {
                @Override public String call(String s, Long aLong) {
                    return s;
                }
            });
            delayedObservable.subscribe(subscriber);

            Runnable r = new Runnable() {
                @Override public void run() {
                    // This thread makes the timeout, it is up to you if you would keep this like this.
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onError(new TimeoutException());
                    subscriber.onCompleted();
                }
            };
            Thread thread = new Thread(r);
            thread.start();
        }
    });
}

This worked for me, in a very basic situation, I hope it can help you.

Upvotes: 1

david.mihola
david.mihola

Reputation: 12982

As Christopher said in his comment, the reason you are getting the error is that timeout will throw a TimeoutException whenever a non-finished Observable (onCompleted was not yet called) fails to produce the next onNext within the set timeout.

Since I am not sure what your source Observable - or rather, the Observables inside the flatMaps - are doing I would first check whether it should actually produce an onCompleted (probably after at least one onNext) or whether it stays open "by design" (it may be that the source is an open ended stream like the network state of your device). If the source itself is open-ended by design, you can artificially introduce an onCompleted after the first onNext by just adding take(1) to your chain.

Upvotes: 3

Related Questions