Reputation: 851
How to write test case using StepVerifier for Flux interval and delayElements
I want to write StepVerifier for below scenarios.
Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())
Upvotes: 2
Views: 1100
Reputation: 5924
TL;DR You can use StepVerifier.withVirtualTime
to test time-based operators and avoid long delays. In addition, because stream is infinite, you would need to cancel subscription using thenCancel
at some point.
Here are some examples
@Test
void testDelayElements() {
StepVerifier.withVirtualTime(() ->
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(10000))
.concatMap(ignore -> doSomething())
)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(10000))
.expectNextCount(1)
.expectNoEvent(Duration.ofMillis(10000))
.expectNextCount(1)
.expectNoEvent(Duration.ofMillis(10000))
.expectNextCount(1)
.thenCancel()
.verify();
}
For more information, check Manipulating Time section in the Reactor 3 Reference Guide.
Very important point from the documentation
Take extra care to ensure the Supplier<Publisher> can be used in a lazy fashion. Otherwise, virtual time is not guaranteed. Especially avoid instantiating the Flux earlier in the test code and having the Supplier return that variable. Instead, always instantiate the Flux inside the lambda.
Note that the publisher is created lazily using the Supplier<Publisher<T>>
. For example, the following will not work as expected
@Test
void testDelayElements() {
var stream = Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(10000))
.concatMap(ignore -> doSomething());
StepVerifier.withVirtualTime(() -> stream)
....
}
Upvotes: 4