Reputation: 95
We encountered the following behaviour which we do understand, however we'd like to find out whether it is expected and whether it might be of interest to document it as some kind of pitfall.
We're experimenting with Spring Boot 2/Spring WebFlux and set up a small application that basically has something like this (all shortened):
@PostMapping
public Mono<Todo> addTodos( @RequestBody Person person ) {
return personService.addPerson( person );
}
The service first looked like this, as we want to publish the event of a person addition also to a message queue:
public class PersonService {
public Mono<Person> addPerson( Person person ) {
Mono<Person> addedPerson = personRepository.save( person );
addedPerson.subscribe( p -> rabbitTemplate.convertAndSend( "persons", p ) );
return addedPerson;
}
}
So, this is obviously wrong to do it like that. The .subscribe()
triggers the flow and we assume that the reactive REST controller does the same in the background before serializing the data for the response, resulting in a second parallel flow. In the end we ended up with two duplicate entries in the persons
collection in the database.
After this lengthy introduction finally the question: is this expected behaviour that multiple subscribers trigger multiple inserts (basically, if you subscribe n
times you get n
inserts)?
If yes this might be a pitfall to emphasize for beginners, especially if our understanding is correct, that the reactive REST controllers perform a .subscribe()
under the hoods.
Upvotes: 0
Views: 3249
Reputation: 18119
You came yourself to the conclusion which describes the expected behavior.
The reactive programming model differs from an imperative programming model in various areas.
Imperative programming combines transformations, mapping, execution, and other aspects. You express these by creating conditional/loop flows, method invocations that may return values and pass values on to API calls.
Reactive programming decouples the declaration of what is happening from how it's going to be executed. Execution using reactive infrastructure is divided into two parts: Reactive sequence composition and the actual execution. In your code, you only compose reactive sequences. Execution happens outside of your code.
When you compose a Publisher
, then the resulting Publisher
contains a declaration of things that will happen if executed. A Publisher
does not imply whether it's going to be executed in the first place nor how many subscribers will subscribe eventually.
Taken the example from above, Mono<Person> PersonRepository.save(…)
returns a publisher that:
Person
to Document
Document
to MongoDB andPerson
once a response from MongoDB comes backIt's a recipe for saving data using a specific repository method. Creating the publisher does not execute the publisher and the publisher is not opinionated on the number of executions. Multiple calls to .subscribe()
execute the publisher multiple times.
I'd argue .subscribe()
is not a pitfall. A reactive programming model approach takes the execution out of your way. If you call .subscribe()
or .block()
, then you should have a very good reason for doing so. Every time you see .subscribe()
or .block()
in your code you should pay extra attention whether that's the right thing to do. Your execution environment is in charge of subscribing to Publisher
s.
A few observations:
RabbitTemplate
is a blocking API. You should not mix reactive and blocking APIs. If you have no other option, then offload blocking calls to a worker. Use either publishOn(…)
along a Scheduler
before the actual operator containing the blocking work or use ExecutorService
/CompletableFuture
together with flatMap(…)
.flatMap(…)
operators for reactive flow composition of Mono
/Flux
. The flatMap(…)
operator starts non-blocking subprocesses that complete eventually and continue the flow.doOnXXX(…)
operators (doOnNext(…)
, doOnSuccess(…)
, …) for callbacks when the publisher emits particular signals. These hook methods allow convenient interception of elements non-blocking consumption.References:
Upvotes: 5