Felipe
Felipe

Reputation: 7583

How to ensure that a Flux with infinite stream completes in a given time?

I have a Flux generator (from project reactor) for stream that delays 1 second to emit each element. And because the stream is infinite I also decide how many elements I would like to take using the take() method. I want to test that such a method can be executed in a given time. I tried to create my solution inspired by this answer but without success.

This is the Flux method

import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
    public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec))
                .map(l -> l.doubleValue())
                .take(numberOfElements)
                .log();
    }
}

and this is the unit test. I have tried the approaches using .thenConsumeWhile(value -> true), .expectNextCount(4), .expectNext(0.0, 1.0, 2.0, 3.0) after waiting 4 seconds on the .thenAwait(Duration.ofSeconds(4)). But none of them works.

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
    FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
    @Test
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                // .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }
}

I am receiving this error which says that I am not consuming values 1.0, 2.0, ... Basically I want to make sure that I consume only the values 0.0, 1.0, 2.0, 3.0 within 4 seconds. How do I do that?

12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()

expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))

Additional explain: Just to clarify one more time the behavior that I want. If I uncomment .expectNext(0.0, 1.0, 2.0, 3.0) and replace 4 seconds to 2 seconds in .thenAwait(Duration.ofSeconds(2)), the test should fail. But it is not.

Upvotes: 1

Views: 2057

Answers (2)

Felipe
Felipe

Reputation: 7583

someone else helped me and I am posting the solution here. I am using JUnit 5 timeout. Using @Timeout(value = 5, unit = TimeUnit.SECONDS) the test pass, if I use 4 seconds or less the test fails. The solution is not based on StepVerifier but still achieves its purposes.

    @Test
    @Timeout(value = 5, unit = TimeUnit.SECONDS)
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        StepVerifier.create(streamLongFlux)
                .expectSubscription()
                .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete();
    }

Upvotes: 0

p.streef
p.streef

Reputation: 3815

This test runs "fine" for me if I uncomment the expectNext.

    public Flux<Double> create(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
    }

    @Test
    public void virtualTimeTest() {
        Flux<Double> streamLongFlux = create(4,1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                 .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }

It's not running in virtual time because the Flux.interval is called before virtual time is established.

 StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .thenAwait(Duration.ofSeconds(4))
                    .expectNext(0.0, 1.0, 2.0, 3.0)
                    .verifyComplete();

This does run in virtual time and takes 400 ms in stead of 4.4 seconds

Another even more explicit test would be:

@Test
    public void virtualTime(){
        StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(0.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(1.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(2.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(3.0)
                    .verifyComplete();
    }

Upvotes: 1

Related Questions