Rx Timeout Operator

I want to emmit one item after x seconds if no items have been emited. I was trying to use the timeout operator. The problem is that timeout operator requires at least one item to be executed previously to start the countdown. From the documentation:

"If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable begins instead to mirror a fallback Observable."

That is not the behaviour I'm looking for. When I subscribe to my observable, I want to emit a particular item if a particular period of time elapses without any emitted items previously.

Example:

 getUserLocationFromGPS() //Sometimes I dont receive user location
.timeout(5, TimeUnit.SECONDS, Observable.just(getDefaultLocation())
.subscribe(...);

Upvotes: 4

Views: 2877

Answers (2)

Krzysztof Skowronek
Krzysztof Skowronek

Reputation: 2936

I don't use java-rx specifically, but here is an idea:

getUserLocationFromGPS()
.buffer(5 sec, 1 item) //not sure how to do this in rx-java
.map(x => { // this is select in .net
    if(x.IsEmpty)
      return getDefautLocation();
    else
      return x[0];
    }).Subscribe(...)

buffer will get you list of items every 5sec. You can specify it to return after only 1 item was emited by getUserLocationFromGPS()

map will check if the list is empty (no item was emited) and return proper value

Upvotes: 0

Benjamin
Benjamin

Reputation: 7368

final Observable data = yourApi.getData().take(5, TimeUnit.SECONDS);
final Observable fallback = Observable.just("Fallback");

Observable.concat(data, fallback).firstOrError()
            .subscribe(
                    System.out::println
            );
  1. If yourApi.getData() does not emit anything then you will receive data from your fallback observable.

  2. If yourApi.getData() timeouts then you will receive data from your fallback observable.

  3. If yourApi.getData() emits normally then you will receive data from it.

Upvotes: 4

Related Questions