chhil
chhil

Reputation: 460

How to prevent timeout(...) from cancelling the stream emission?

Here is what I want to do....

Perform a bunch of tasks. Each task needs to complete in x number of seconds. If it does not complete, log a timeout and continue with handling the next emission.

public static void main(String[] args) {
    Observable<String> source = Observable.create(emitter -> {
        emitter.onNext(task(0, "A"));
        emitter.onNext(task(2, "B")); // this one times out
        emitter.onNext(task(0, "C"));
        emitter.onNext(task(0, "D"));
        emitter.onComplete();
        });

    source.subscribeOn(Schedulers.computation())
          .timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
          .blockingSubscribe(s -> System.out.println("RECEIVED: " + s));
}

private static String task(int i, String string) {
    try {
        TimeUnit.SECONDS.sleep(i);
    }
    catch (InterruptedException e) {

    }

    return string;

}  

Actual result

RECEIVED: A
RECEIVED: timeout

Expected result

RECEIVED: A
RECEIVED: timeout
RECEIVED: C
RECEIVED: D

Basically I don't want the emissions to terminate on a timeout.

Upvotes: 0

Views: 151

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You could defer the execution of the tasks and apply timeout with each separately:

Observable<String> source = Observable.create(
        (ObservableEmitter<Callable<String>> emitter) -> {
    emitter.onNext(() -> task(0, "A"));
    emitter.onNext(() -> task(2, "B")); // this one times out
    emitter.onNext(() -> task(0, "C"));
    emitter.onNext(() -> task(0, "D"));
    emitter.onComplete();
})
.concatMap(call ->
    Observable.fromCallable(call)
       .subscribeOn(Schedulers.computation())
       .timeout(1, TimeUnit.SECONDS, Observable.just("timeout")) 
);

source
      .blockingSubscribe(s -> System.out.println("RECEIVED: " + s));

Upvotes: 2

Related Questions