Reputation: 141
I have device that send me ping, and I use observable for this. But before first ping we start connection and it takes some time. Therefore I want first ping have 10 sec timeout. I make it this way:
public Observable<Ping> getPing() {
ConnectableObservable<Ping> observable = device.connectToDevice().publish();
Observable<Ping> firstWithTimeout = observable.take(1).timeout(10, TimeUnit.SECONDS);
Observable<Ping> fromSecondWithoutTimeout = observable.skip(1);
Observable<Ping> mergedObservable = firstWithTimeout.mergeWith(fromSecondWithoutTimeout)
.doOnDispose(() -> disconnect(bluetoothDevice))
.doOnError(error -> disconnect(bluetoothDevice));
observable.connect();
return mergedObservable;
}
For test I use
Subject<Ping> observable = PublishSubject.create();
when(device.connect()).thenReturn(observable);
TestObserver<Ping> testSubscriber = TestObserver.create();
getPing.subscribe(testSubscriber);
observable.onNext(new Ping());
testSubscriber.assertValueCount(1);
This test will fail, because TimeoutException, in spite I send ping instantly.
Upvotes: 2
Views: 1637
Reputation: 87
There is an overloaded timeout
operator that will perfectly fit here:
timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)
Assuming that your observable reference is testObserable
you simply do the:
testObservable.timeout(
Observable.timer(5L, TimeUnit.SECONDS), // here you set first item timeout
ignored -> Observable.never() // for other elements there is no time function
)
Upvotes: 2
Reputation: 4002
Please have a look at this setup:
JUnit5 / RxJava2
I think you error a incorrect configuration of the mock
when(device.connect()).thenReturn(observable);
Please have a look at my implementation. There is no need to use publish / connect when you are createing a new observable with every method-invocation. Use autoConnect in device for method-impl connectToDevice()
Device device;
@BeforeEach
void setUp() {
device = mock(Device.class);
}
@Test
void name() throws Exception {
Subject<Ping> observable = PublishSubject.create();
when(device.connectToDevice()).thenReturn(observable);
TestObserver<Ping> test = getPing(Schedulers.computation()).test();
observable.onNext(new Ping());
test.assertValueCount(1);
}
@Test
void name2() throws Exception {
Subject<Ping> observable = PublishSubject.create();
when(device.connectToDevice()).thenReturn(observable);
TestScheduler testScheduler = new TestScheduler();
TestObserver<Ping> test = getPing(testScheduler).test();
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
observable.onNext(new Ping());
test.assertError(TimeoutException.class);
}
private Observable<Ping> getPing(Scheduler scheduler) {
return device
.connectToDevice()
.take(1)
.timeout(10, TimeUnit.SECONDS, scheduler)
.doOnDispose(() -> disconnect())
.doOnError(error -> disconnect());
}
private void disconnect() {}
interface Device {
Observable<Ping> connectToDevice();
}
class Ping {}
Upvotes: 1