Reputation: 23492
I'm experimenting with Spring on Reactive stack. I'm relatively new to reactive streams in Java. I'm tyring to wrap my head around how to test and verify object collaboration in reactive streams. I'm using spring-test and reactor-test libraries for TDD testing.
Here is the flawed test
@Test
void givenWeatherService_whenGetWeather_thenPublishWeatherAsync() throws Exception {
Mono<Sample> mono = service.updateWeatherFeed();
RecordedRequest request = server.takeRequest();
assertThat(request.getMethod()).isEqualTo("GET");
assertThat(request.getPath()).isEqualTo("/api/weather");
StepVerifier.create(mono)
.assertNext(sample -> {
verify(publisher, times(1)).publish(sample);
verify(sensorRepository, times(1)).insertSample(sample);
}).verifyComplete();
}
And here is the service method implementation
public Mono<Sample> updateWeatherFeed() {
// Uses WebClient to return the stream
Mono<Sample> mono = getWeather();
mono.doOnNext(sample -> {
sensorRepository.insertSample(sample);
samplePublisher.publish(sample);
}).subscribe();
return mono;
}
The StepVerifier hangs with the following log:
10:21:01.827 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x81e9584a, L:/127.0.0.1:62510 - R:localhost/127.0.0.1:62479] Channel cleaned, now 1 active connections and 1 inactive connections
What is the correct way to verify the stream was consumed and participating handlers were invoked in order?
Upvotes: 1
Views: 4461
Reputation: 14732
Your problem is probably here:
public Mono<Sample> updateWeatherFeed() {
// Uses WebClient to return the stream
Mono<Sample> mono = getWeather();
mono.doOnNext(sample -> {
sensorRepository.insertSample(sample);
samplePublisher.publish(sample);
}).subscribe(); // <--- right here
return mono;
}
You see you are subscribing and thus consuming the stream. If you look in the api for Mono#subscribe you can see that it returns a disposable. Which means it has been consumed, and is ready to be disposed.
Reactive streams consists of a producer
and a consumer
. The consumer
subscribes
to the producer
and starts publishing
. Whomever initiates the call is the consumer
hence should be the one to subscribe
.
In your case, the test is the one initiating the call, so the test is the consumer
and should be the one subscribing
.
If we look at the StepVerifier#create we can see that it takes a Publisher
and a publisher is a non/consumed Mono
or Flux
, not a Disposable
.
So how do we fix this, well you need to remove the subscribe()
public Mono<Sample> updateWeatherFeed() {
return getWeather().flatMap(sample -> {
sensorRepository.insertSample(sample);
samplePublisher.publish(sample);
return sample;
});
}
Then you dont need the StepVerifier
since you are returning a Mono
which is just one item. If you where testing a Flux
you can use the StepVerifier
to "step through" your Flux
.
// Set up mocks and such
@Test
void doTest() throws Exception {
final Service service = new Service(publisherMock, sensorRepositoryMock);
final Sample sample = service.updateWeatherFeed().block();
// Assert Sample
assertEquals(sample, ...);
// Verify mocks
verify(publisherMock, times(1)).publish(sample);
verify(sensorRepositoryMock, times(1)).insertSample(sample);
}
It's been a while i did this but i wrote this by free hand so i hope you at least the the gist of it.
Upvotes: 2