Reputation: 3
I have a method which takes a Flux as a parameter. Inside the method it then configures the Flux and another method subscribes to it.
public class StringProcessor {
private final stringParsingService stringParsingService;
public void subscribeStringFlux(Flux<String> stringFlux) {
fluxConfiguration(stringFlux)
.subscribe();
}
Flux<String> fluxConfiguration(Flux<String> stringFlux) {
return stringFlux
.filter(stringValidatorr::isValidString)
.doOnNext(itemString -> {
List<String> values = stringParsingService.parseValues(itemString);
})
.onErrorContinue((e,object) -> log.error(e.getClass().toString()+" "+e.getMessage()));
}
}
I am trying to test that the code in doOnNext is executed. I've tried to use StepVerifier (with Mockito for the service) however it never seems to enter the code during the test when I put breakpoints in stringParsingService.parseValue(). However, the code does run and execute as expected though when not being run in a test with real data.
My question is, how do you write tests that cover actions taken in a Flux.doOnNext()? Is there a way to use StepVerifier that will get it to execute code in doOnNext()? I've searched for days, and tried multiple approaches and so far none of them have worked.
The only way I've found so far that even comes close is to do the following (however this of course doesn't count for code coverage):
Flux<String> testStringFlux = Flux.just("a_test_string");
StepVerifier.create(testStringFlux)
.consumeNextWith(itemString -> {
List<String> values = stringParsingService.parseValues(itemString);
})
.verifyComplete();
Upvotes: 0
Views: 2488
Reputation: 28301
doOnNext
is a side-effet operator, meaning that the work it performs isn't really visible from the perspective of the main reactive sequence (the Flux<T>
). As a result, none of the tools that are used to test a Flux
can really see and test the side-effect, unless YOU make it testable explicitly.
One possible way would be to make the StringProcessor.stringParsingService
used in your test a mock, or a test-specific instance that records parsed strings, and then assert that at the end of the Flux
sequence.
Note that your doOnNext
computes a List
, but that list isn't used after that. The elements emitted by someFlux.doOnNext(function)
are exactly the same as the ones emitted by someFlux
, independently of what is done inside the function
(unless the function
throws, but that's a different story).
Upvotes: 2
Reputation: 1024
if you configure your flux as follows, it will stop at a breakpoint on the parseValues function. But I am not sure if this is what you are asking for...
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flux_test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
<version>3.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
StringProcessor.java
import org.apache.log4j.Logger;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;
public class StringProcessor {
private final static Logger log = Logger.getLogger(StringProcessor.class.getName());
private class StringParsingService {
List<String> parseValues(String str) {
return Arrays.asList(str.split(","));
}
}
private static class StringValidator {
static boolean isValidString(String str) {
return str.length() > 1;
}
}
private final StringParsingService stringParsingService = new StringParsingService();
public void subscribeStringFlux(Flux<String> stringFlux) {
fluxConfiguration(stringFlux)
.subscribe();
}
Flux<String> fluxConfiguration(Flux<String> stringFlux) {
return stringFlux
.filter(StringValidator::isValidString)
.doOnNext(itemString -> {
List<String> values = stringParsingService.parseValues(itemString);
})
.onErrorContinue((e, object) -> log.error(e.getClass().toString() + " " + e.getMessage()));
}
}
TestFlux.java
import org.junit.Test;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
public class TestFlux {
final TestPublisher<String> testPublisher = TestPublisher.create();
@Test
public void testFlux() {
StepVerifier.create(new StringProcessor().fluxConfiguration(testPublisher.flux()))
.then(() -> testPublisher.emit("aa,bb", "cc,dd"))
.expectNext("aa,bb")
.expectNext("cc,dd")
.verifyComplete();
}
}
Upvotes: 0