rcn
rcn

Reputation: 3

Unit Testing Flux.doOnNext()

I have a method which takes a Flux as a parameter. Inside the method it then configures the Flux and another method subscribes to it.

public class StringProcessor {

  private final stringParsingService stringParsingService;

  public void subscribeStringFlux(Flux<String> stringFlux) {
    fluxConfiguration(stringFlux)
        .subscribe();
  }

  Flux<String> fluxConfiguration(Flux<String> stringFlux) {
    return stringFlux
        .filter(stringValidatorr::isValidString)
        .doOnNext(itemString -> {
          List<String> values = stringParsingService.parseValues(itemString);
        })
        .onErrorContinue((e,object) -> log.error(e.getClass().toString()+" "+e.getMessage()));
  }
}

I am trying to test that the code in doOnNext is executed. I've tried to use StepVerifier (with Mockito for the service) however it never seems to enter the code during the test when I put breakpoints in stringParsingService.parseValue(). However, the code does run and execute as expected though when not being run in a test with real data.

My question is, how do you write tests that cover actions taken in a Flux.doOnNext()? Is there a way to use StepVerifier that will get it to execute code in doOnNext()? I've searched for days, and tried multiple approaches and so far none of them have worked.

The only way I've found so far that even comes close is to do the following (however this of course doesn't count for code coverage):

    Flux<String> testStringFlux = Flux.just("a_test_string");

    StepVerifier.create(testStringFlux)
        .consumeNextWith(itemString -> {
           List<String> values = stringParsingService.parseValues(itemString);
        })
        .verifyComplete();

Upvotes: 0

Views: 2488

Answers (2)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

doOnNext is a side-effet operator, meaning that the work it performs isn't really visible from the perspective of the main reactive sequence (the Flux<T>). As a result, none of the tools that are used to test a Flux can really see and test the side-effect, unless YOU make it testable explicitly.

One possible way would be to make the StringProcessor.stringParsingService used in your test a mock, or a test-specific instance that records parsed strings, and then assert that at the end of the Flux sequence.

Note that your doOnNext computes a List, but that list isn't used after that. The elements emitted by someFlux.doOnNext(function) are exactly the same as the ones emitted by someFlux, independently of what is done inside the function (unless the function throws, but that's a different story).

Upvotes: 2

Kemal Kaplan
Kemal Kaplan

Reputation: 1024

if you configure your flux as follows, it will stop at a breakpoint on the parseValues function. But I am not sure if this is what you are asking for...

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flux_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.8</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
            <version>3.2.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

StringProcessor.java

import org.apache.log4j.Logger;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.List;

public class StringProcessor {

    private final static Logger log = Logger.getLogger(StringProcessor.class.getName());

    private class StringParsingService {
        List<String> parseValues(String str) {
            return Arrays.asList(str.split(","));
        }
    }

    private static class StringValidator {
        static boolean isValidString(String str) {
            return str.length() > 1;
        }
    }

    private final StringParsingService stringParsingService = new StringParsingService();

    public void subscribeStringFlux(Flux<String> stringFlux) {
        fluxConfiguration(stringFlux)
                .subscribe();
    }

    Flux<String> fluxConfiguration(Flux<String> stringFlux) {
        return stringFlux
                .filter(StringValidator::isValidString)
                .doOnNext(itemString -> {
                    List<String> values = stringParsingService.parseValues(itemString);
                })
                .onErrorContinue((e, object) -> log.error(e.getClass().toString() + " " + e.getMessage()));
    }

}

TestFlux.java

import org.junit.Test;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

public class TestFlux {

    final TestPublisher<String> testPublisher = TestPublisher.create();

    @Test
    public void testFlux() {
        StepVerifier.create(new StringProcessor().fluxConfiguration(testPublisher.flux()))
                .then(() -> testPublisher.emit("aa,bb", "cc,dd"))
                .expectNext("aa,bb")
                .expectNext("cc,dd")
                .verifyComplete();
    }

}

Upvotes: 0

Related Questions