O10
O10

Reputation: 35

RxJava timeout without unsubscribing from source

I'm subscribing to hot observable, and then applying timeout operator to it, however I do not want to unsubscribe when TimeoutException is thrown, only produce special item (I know that source will eventually emit new items). How can I achieve that?

I'm trying to combine timeout with onErrorReturn but again this causes onComplete call on the subscriber.

Upvotes: 1

Views: 573

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Publish, timeout and retry (adapted from my older answer):

Observable<Long> source =
    Observable.just(100L, 200L, 500L, 1000L, 5000L, 5500L, 6000L)
    .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(a -> v));

source.publish(co -> 
    co.timeout(750, TimeUnit.MILLISECONDS, 
         Observable.just(-1L)
         .concatWith(Observable.error(new RuntimeException()))
    )
    .retry()
 ).blockingForEach(System.out::println);

Upvotes: 1

Related Questions