Tuomas Toivonen
Tuomas Toivonen

Reputation: 23492

Spring WebFlux & Mockito: How to verify the stream was consumed and participating handlers were invoked

I'm experimenting with Spring on Reactive stack. I'm relatively new to reactive streams in Java. I'm tyring to wrap my head around how to test and verify object collaboration in reactive streams. I'm using spring-test and reactor-test libraries for TDD testing.

Here is the flawed test

@Test
void givenWeatherService_whenGetWeather_thenPublishWeatherAsync() throws Exception {
    
    Mono<Sample> mono = service.updateWeatherFeed();
    
    RecordedRequest request = server.takeRequest();
    assertThat(request.getMethod()).isEqualTo("GET");
    assertThat(request.getPath()).isEqualTo("/api/weather");
    
    StepVerifier.create(mono)
        .assertNext(sample -> {
            verify(publisher, times(1)).publish(sample);
            verify(sensorRepository, times(1)).insertSample(sample);
        }).verifyComplete();
}

And here is the service method implementation

public Mono<Sample> updateWeatherFeed() {

    // Uses WebClient to return the stream
    Mono<Sample> mono = getWeather();
    
    mono.doOnNext(sample -> {
        sensorRepository.insertSample(sample);
        samplePublisher.publish(sample);
    }).subscribe();
    
    return mono;
}

The StepVerifier hangs with the following log:

10:21:01.827 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x81e9584a, L:/127.0.0.1:62510 - R:localhost/127.0.0.1:62479] Channel cleaned, now 1 active connections and 1 inactive connections

What is the correct way to verify the stream was consumed and participating handlers were invoked in order?

Upvotes: 1

Views: 4461

Answers (1)

Toerktumlare
Toerktumlare

Reputation: 14732

Your problem is probably here:

public Mono<Sample> updateWeatherFeed() {

    // Uses WebClient to return the stream
    Mono<Sample> mono = getWeather();
    
    mono.doOnNext(sample -> {
        sensorRepository.insertSample(sample);
        samplePublisher.publish(sample);
    }).subscribe(); // <--- right here
    
    return mono;
}

You see you are subscribing and thus consuming the stream. If you look in the api for Mono#subscribe you can see that it returns a disposable. Which means it has been consumed, and is ready to be disposed.

Reactive streams consists of a producer and a consumer. The consumer subscribes to the producer and starts publishing. Whomever initiates the call is the consumer hence should be the one to subscribe.

In your case, the test is the one initiating the call, so the test is the consumer and should be the one subscribing.

If we look at the StepVerifier#create we can see that it takes a Publisher and a publisher is a non/consumed Mono or Flux, not a Disposable.

So how do we fix this, well you need to remove the subscribe()

public Mono<Sample> updateWeatherFeed() {
    return getWeather().flatMap(sample -> {
        sensorRepository.insertSample(sample);
        samplePublisher.publish(sample);
        return sample;
    });
}

Then you dont need the StepVerifier since you are returning a Mono which is just one item. If you where testing a Flux you can use the StepVerifier to "step through" your Flux.

// Set up mocks and such

@Test
void doTest() throws Exception {

    final Service service = new Service(publisherMock, sensorRepositoryMock);
    final Sample sample = service.updateWeatherFeed().block();
    
    // Assert Sample
    assertEquals(sample, ...);

    // Verify mocks
    verify(publisherMock, times(1)).publish(sample);
    verify(sensorRepositoryMock, times(1)).insertSample(sample);
}

It's been a while i did this but i wrote this by free hand so i hope you at least the the gist of it.

Upvotes: 2

Related Questions