Reputation: 24659
I have an issue with a reactive chain relying on flatMap()
and switchIfEmpty()
. For some reason, one of the Mono
does not emit anything...
This is the public handler method calling the others:
//Throws: NoSuchElementException: Source was empty
public Mono<ServerResponse> createUser(ServerRequest serverRequest) {
Hooks.onOperatorDebug();
Mono<User> userMono = serverRequest.bodyToMono(User.class);
return validateUser(userMono)
.switchIfEmpty(saveUser(userMono))
.single();
}
This is the first method called by createUser
. Note that it is not called from a switchIfEmpty()
(see above) and it does emit an error if there is any.
private Mono<ServerResponse> validateUser(Mono<User> userMono) {
return userMono
.map(this::computeErrors)
.filter(AbstractBindingResult::hasErrors)
.flatMap(err ->
status(BAD_REQUEST)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject(err.getAllErrors()))
);
}
This is just a helper method:
private AbstractBindingResult computeErrors(User user) {
AbstractBindingResult errors = new BeanPropertyBindingResult(user, User.class.getName());
userValidator.validate(user, errors);
return errors;
}
This is the saveUser
method. It does not emit any result!!. It is called from a switchIfEmpty
(see above).
private Mono<ServerResponse> saveUser(Mono<User> userMono) {
return userMono
.flatMap(userRepository::save)
.flatMap(newUser -> status(CREATED)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject(newUser))
);
}
However, if I just call the saveUser
method directly, it will emit a result.
//Works fine
public Mono<ServerResponse> createUser(ServerRequest serverRequest) {
Hooks.onOperatorDebug();
Mono<User> userMono = serverRequest.bodyToMono(User.class);
return saveUser(userMono) // Compare this to the above version
.single();
}
Can anyone please help figure out why the saveUser
method does not emit anything when called from a switchIfEmpty()
?
Here is the error I get:
java.util.NoSuchElementException: Source was empty
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:165) ~[reactor-core-3.3.0.RC1.jar:3.3.0.RC1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingleMono] :
reactor.core.publisher.Mono.single(Mono.java:3898)
org.example.contracttestingdemo.handler.UserHandler.createUser(UserHandler.java:55)
Upvotes: 1
Views: 890
Reputation: 72254
It's not to do with flatMap()
or switchIfEmpty()
directly - it's because you're trying to consume the same Mono
twice:
Mono<User> userMono = serverRequest.bodyToMono(User.class);
return validateUser(userMono)
.switchIfEmpty(saveUser(userMono))
.single();
In the above example, you're passing userMono
first to validateUser()
, and then to saveUser()
(but by that point the User
has already been emitted.)
If you want the publisher to be subscribed to multiple times, and output the same result, you'll need to cache it by calling serverRequest.bodyToMono(User.class).cache();
.
Upvotes: 1