Reputation: 7583
I have a Flux
generator (from project reactor) for stream that delays 1 second to emit each element. And because the stream is infinite I also decide how many elements I would like to take using the take()
method. I want to test that such a method can be executed in a given time. I tried to create my solution inspired by this answer but without success.
This is the Flux
method
import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec))
.map(l -> l.doubleValue())
.take(numberOfElements)
.log();
}
}
and this is the unit test. I have tried the approaches using .thenConsumeWhile(value -> true)
, .expectNextCount(4)
, .expectNext(0.0, 1.0, 2.0, 3.0)
after waiting 4 seconds on the .thenAwait(Duration.ofSeconds(4))
. But none of them works.
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
@Test
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
// .expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
}
I am receiving this error which says that I am not consuming values 1.0
, 2.0
, ... Basically I want to make sure that I consume only the values 0.0, 1.0, 2.0, 3.0
within 4 seconds. How do I do that?
12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()
expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
Additional explain: Just to clarify one more time the behavior that I want. If I uncomment .expectNext(0.0, 1.0, 2.0, 3.0)
and replace 4 seconds to 2 seconds in .thenAwait(Duration.ofSeconds(2))
, the test should fail. But it is not.
Upvotes: 1
Views: 2057
Reputation: 7583
someone else helped me and I am posting the solution here. I am using JUnit 5 timeout. Using @Timeout(value = 5, unit = TimeUnit.SECONDS)
the test pass, if I use 4 seconds or less the test fails. The solution is not based on StepVerifier
but still achieves its purposes.
@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
StepVerifier.create(streamLongFlux)
.expectSubscription()
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
}
Upvotes: 0
Reputation: 3815
This test runs "fine" for me if I uncomment the expectNext.
public Flux<Double> create(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
}
@Test
public void virtualTimeTest() {
Flux<Double> streamLongFlux = create(4,1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
It's not running in virtual time because the Flux.interval is called before virtual time is established.
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
This does run in virtual time and takes 400 ms in stead of 4.4 seconds
Another even more explicit test would be:
@Test
public void virtualTime(){
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(1.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(2.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(3.0)
.verifyComplete();
}
Upvotes: 1