Andreas Schilling
Andreas Schilling

Reputation: 95

Subscribing twice on Spring Data MongoDB save() results in double insert

We encountered the following behaviour which we do understand, however we'd like to find out whether it is expected and whether it might be of interest to document it as some kind of pitfall.

We're experimenting with Spring Boot 2/Spring WebFlux and set up a small application that basically has something like this (all shortened):

@PostMapping
public Mono<Todo> addTodos( @RequestBody Person person ) {
    return personService.addPerson( person );
}

The service first looked like this, as we want to publish the event of a person addition also to a message queue:

public class PersonService {
    public Mono<Person> addPerson( Person person ) {
        Mono<Person> addedPerson = personRepository.save( person );
        addedPerson.subscribe( p -> rabbitTemplate.convertAndSend( "persons", p ) );
        return addedPerson;
    }
}

So, this is obviously wrong to do it like that. The .subscribe() triggers the flow and we assume that the reactive REST controller does the same in the background before serializing the data for the response, resulting in a second parallel flow. In the end we ended up with two duplicate entries in the persons collection in the database.

After this lengthy introduction finally the question: is this expected behaviour that multiple subscribers trigger multiple inserts (basically, if you subscribe n times you get n inserts)?

If yes this might be a pitfall to emphasize for beginners, especially if our understanding is correct, that the reactive REST controllers perform a .subscribe() under the hoods.

Upvotes: 0

Views: 3249

Answers (1)

mp911de
mp911de

Reputation: 18119

You came yourself to the conclusion which describes the expected behavior.

The reactive programming model differs from an imperative programming model in various areas.

Imperative programming combines transformations, mapping, execution, and other aspects. You express these by creating conditional/loop flows, method invocations that may return values and pass values on to API calls.

Reactive programming decouples the declaration of what is happening from how it's going to be executed. Execution using reactive infrastructure is divided into two parts: Reactive sequence composition and the actual execution. In your code, you only compose reactive sequences. Execution happens outside of your code.

When you compose a Publisher, then the resulting Publisher contains a declaration of things that will happen if executed. A Publisher does not imply whether it's going to be executed in the first place nor how many subscribers will subscribe eventually.

Taken the example from above, Mono<Person> PersonRepository.save(…) returns a publisher that:

  1. Maps data from Person to Document
  2. Saves the Document to MongoDB and
  3. Emits the saved Person once a response from MongoDB comes back

It's a recipe for saving data using a specific repository method. Creating the publisher does not execute the publisher and the publisher is not opinionated on the number of executions. Multiple calls to .subscribe() execute the publisher multiple times.

I'd argue .subscribe() is not a pitfall. A reactive programming model approach takes the execution out of your way. If you call .subscribe() or .block(), then you should have a very good reason for doing so. Every time you see .subscribe() or .block() in your code you should pay extra attention whether that's the right thing to do. Your execution environment is in charge of subscribing to Publishers.


A few observations:

  • RabbitTemplate is a blocking API. You should not mix reactive and blocking APIs. If you have no other option, then offload blocking calls to a worker. Use either publishOn(…) along a Scheduler before the actual operator containing the blocking work or use ExecutorService/CompletableFuture together with flatMap(…).
  • Use flatMap(…) operators for reactive flow composition of Mono/Flux. The flatMap(…) operator starts non-blocking subprocesses that complete eventually and continue the flow.
  • Use doOnXXX(…) operators (doOnNext(…), doOnSuccess(…), …) for callbacks when the publisher emits particular signals. These hook methods allow convenient interception of elements non-blocking consumption.

References:

Upvotes: 5

Related Questions