RxJava Observable timeout before first element

I have device that send me ping, and I use observable for this. But before first ping we start connection and it takes some time. Therefore I want first ping have 10 sec timeout. I make it this way:

public Observable<Ping> getPing() {
    ConnectableObservable<Ping> observable = device.connectToDevice().publish();

    Observable<Ping> firstWithTimeout = observable.take(1).timeout(10, TimeUnit.SECONDS);
    Observable<Ping> fromSecondWithoutTimeout = observable.skip(1);

    Observable<Ping> mergedObservable = firstWithTimeout.mergeWith(fromSecondWithoutTimeout)
            .doOnDispose(() -> disconnect(bluetoothDevice))
            .doOnError(error -> disconnect(bluetoothDevice));

    observable.connect();
    return mergedObservable;
}

For test I use

Subject<Ping> observable = PublishSubject.create();
when(device.connect()).thenReturn(observable);
TestObserver<Ping> testSubscriber = TestObserver.create();
getPing.subscribe(testSubscriber);
observable.onNext(new Ping());

testSubscriber.assertValueCount(1);

This test will fail, because TimeoutException, in spite I send ping instantly.

Upvotes: 2

Views: 1637

Answers (2)

Dominik
Dominik

Reputation: 87

There is an overloaded timeout operator that will perfectly fit here:

timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)

Assuming that your observable reference is testObserable you simply do the:

testObservable.timeout(
        Observable.timer(5L, TimeUnit.SECONDS), // here you set first item timeout
        ignored -> Observable.never() // for other elements there is no time function
)

Upvotes: 2

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

Please have a look at this setup:

JUnit5 / RxJava2

I think you error a incorrect configuration of the mock

when(device.connect()).thenReturn(observable);

Please have a look at my implementation. There is no need to use publish / connect when you are createing a new observable with every method-invocation. Use autoConnect in device for method-impl connectToDevice()

  Device device;

  @BeforeEach
  void setUp() {
    device = mock(Device.class);
  }

  @Test
  void name() throws Exception {
    Subject<Ping> observable = PublishSubject.create();

    when(device.connectToDevice()).thenReturn(observable);

    TestObserver<Ping> test = getPing(Schedulers.computation()).test();
    observable.onNext(new Ping());

    test.assertValueCount(1);
  }

  @Test
  void name2() throws Exception {
    Subject<Ping> observable = PublishSubject.create();

    when(device.connectToDevice()).thenReturn(observable);

    TestScheduler testScheduler = new TestScheduler();
    TestObserver<Ping> test = getPing(testScheduler).test();

    testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);

    observable.onNext(new Ping());

    test.assertError(TimeoutException.class);
  }

  private Observable<Ping> getPing(Scheduler scheduler) {

    return device
        .connectToDevice()
        .take(1)
        .timeout(10, TimeUnit.SECONDS, scheduler)
        .doOnDispose(() -> disconnect())
        .doOnError(error -> disconnect());
  }

  private void disconnect() {}

  interface Device {
    Observable<Ping> connectToDevice();
  }

  class Ping {}

Upvotes: 1

Related Questions