Ziem
Ziem

Reputation: 6697

Timeout without ending stream

I'm using Android-ReactiveLocation library. I would like to display some warning message when for the last X minutes I didn't get any new location updates. New locations should reset timeout.

How can I implement this kind of timeout without ending stream using RxJava?

Upvotes: 0

Views: 163

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You could publish the source and use a timeout with retry to keep listening to the source:

Observable<Long> source =
    Observable.just(100L, 200L, 500L, 1000L, 5000L, 5500L, 6000L)
    .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(a -> v));

source.publish(co -> co.timeout(750, TimeUnit.MILLISECONDS)
        .doOnError(System.out::println)
        .retry()
).toBlocking().forEach(System.out::println);

Upvotes: 2

Related Questions