Reputation: 180
I want get object/objects from Flux/Mono. I use
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
I do something like that:
Mono<UserEntity> byEmail = userRepository.findByEmail(userDto.getEmail());
UserEntity block = byEmail.block();
And I have error:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
Why? There is any different way to get object/objects?
How in reactive programming do something that: In RequestBody you have UserDto.
Check the email exist in database if not create user.
return userRepository.findByEmail(userDto.getEmail())
//rethrow DB errors as BadRequestException, except user exists
.onErrorResume(t -> Mono.error(new BadRequestException()))
//throwing in map is converted to onError signal
.map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
//switchIfEmpty to replace an empty source with a different Mono
.switchIfEmpty(createUser(userDto))
//at this point, map can only receive an onNext from the createUser
.map(u -> new ResponseEntity(u, HttpStatus.CREATED))
;
Upvotes: 2
Views: 8259
Reputation: 28301
You seem to have found an answer, but let me elaborate a bit on it.
why you can't block
The error message you get indicates that you try to revert to blocking behavior inside a specific Thread
(or thread pool) that is not adapted to blocking. This is the thread used by Spring Webflux (and under the cover, Netty) to process every incoming request in the application. As a consequence, if you block it, you prevent your application from serving new requests at all.
Your answer, and a few small improvements
First off, the map
could be simplified because null
values are not allowed in a Flux
or Mono
. The Spring Data ReactiveCrudRepository
will return an empty Mono
if the value is not in DB, not to be confused with a Mono
that emits null
:
---( null )---|->
onNext(null), onComplete() // onNext(null) being forbidden
vs
---|->
onComplete()
Also, I think that with onErrorResume
you intended to wrap DB errors except the "user exists" case? If so, the placement of this onErrorResume
is wrong because it is similar to a catch(Throwable e)
that will also catch the UserByEmailExistException
. Put it before the map
. You can also directly throw from within the map.
So this boils down to detecting an empty Mono
vs valued Mono
, replace the empty one by an asynchronous DB save and the valued one by an onError
:
TL;DR Gimme the code
return userRepository.findByEmail(userDto.getEmail())
//rethrow DB errors as BadRequestException, except user exists
.onErrorResume(t -> Mono.error(new BadRequestException()))
//throwing in map is converted to onError signal
.map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
//switchIfEmpty to replace an empty source with a different Mono
.switchIfEmpty(createUser(userDto))
//at this point, map can only receive an onNext from the createUser
.map(u -> new ResponseEntity(u, HttpStatus.CREATED))
;
Upvotes: 4