G SriHAri
G SriHAri

Reputation: 851

How to write StepVerfier to test Flux.interval and delayElements?

How to write test case using StepVerifier for Flux interval and delayElements

I want to write StepVerifier for below scenarios.

Flux.interval(Duration.ofMillis(1000))
    .onBackpressureDrop()
    .flatMap(ignore -> doSomething())
Mono.just(1).repeat() // infinite Flux with backpressure
    .delayElements(Duration.ofMillis(1000))
    .concatMap(ignore -> doSomething())

Upvotes: 2

Views: 1100

Answers (1)

Alex
Alex

Reputation: 5924

TL;DR You can use StepVerifier.withVirtualTime to test time-based operators and avoid long delays. In addition, because stream is infinite, you would need to cancel subscription using thenCancel at some point.

Here are some examples

@Test
void testDelayElements() {
    StepVerifier.withVirtualTime(() ->
                Mono.just(1).repeat() // infinite Flux with backpressure
                        .delayElements(Duration.ofMillis(10000))
                        .concatMap(ignore -> doSomething())
            )
            .expectSubscription()
            .expectNoEvent(Duration.ofMillis(10000))
            .expectNextCount(1)
            .expectNoEvent(Duration.ofMillis(10000))
            .expectNextCount(1)
            .expectNoEvent(Duration.ofMillis(10000))
            .expectNextCount(1)
            .thenCancel()
            .verify();
}

For more information, check Manipulating Time section in the Reactor 3 Reference Guide.

Very important point from the documentation

Take extra care to ensure the Supplier<Publisher> can be used in a lazy fashion. Otherwise, virtual time is not guaranteed. Especially avoid instantiating the Flux earlier in the test code and having the Supplier return that variable. Instead, always instantiate the Flux inside the lambda.

Note that the publisher is created lazily using the Supplier<Publisher<T>>. For example, the following will not work as expected

@Test
void testDelayElements() {
    var stream = Mono.just(1).repeat() // infinite Flux with backpressure
            .delayElements(Duration.ofMillis(10000))
            .concatMap(ignore -> doSomething());
    
    StepVerifier.withVirtualTime(() -> stream)
            ....
}

Upvotes: 4

Related Questions