Deluxxx
Deluxxx

Reputation: 180

Mongo Flux/Mono get object/s

I want get object/objects from Flux/Mono. I use

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

I do something like that:

    Mono<UserEntity> byEmail = userRepository.findByEmail(userDto.getEmail());
    UserEntity block = byEmail.block();

And I have error:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

Why? There is any different way to get object/objects?

How in reactive programming do something that: In RequestBody you have UserDto.

Check the email exist in database if not create user.

ANSWER UPDATE FROM @simon-baslé

 return userRepository.findByEmail(userDto.getEmail())
    //rethrow DB errors as BadRequestException, except user exists
    .onErrorResume(t -> Mono.error(new BadRequestException()))
    //throwing in map is converted to onError signal
    .map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
    //switchIfEmpty to replace an empty source with a different Mono
    .switchIfEmpty(createUser(userDto))
    //at this point, map can only receive an onNext from the createUser
    .map(u -> new ResponseEntity(u, HttpStatus.CREATED))
    ;

Upvotes: 2

Views: 8259

Answers (1)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

You seem to have found an answer, but let me elaborate a bit on it.

why you can't block

The error message you get indicates that you try to revert to blocking behavior inside a specific Thread (or thread pool) that is not adapted to blocking. This is the thread used by Spring Webflux (and under the cover, Netty) to process every incoming request in the application. As a consequence, if you block it, you prevent your application from serving new requests at all.

Your answer, and a few small improvements

First off, the map could be simplified because null values are not allowed in a Flux or Mono. The Spring Data ReactiveCrudRepository will return an empty Mono if the value is not in DB, not to be confused with a Mono that emits null:

---( null )---|->
onNext(null), onComplete() // onNext(null) being forbidden

vs

---|->
onComplete()

Also, I think that with onErrorResume you intended to wrap DB errors except the "user exists" case? If so, the placement of this onErrorResume is wrong because it is similar to a catch(Throwable e) that will also catch the UserByEmailExistException. Put it before the map. You can also directly throw from within the map.

So this boils down to detecting an empty Mono vs valued Mono, replace the empty one by an asynchronous DB save and the valued one by an onError:

TL;DR Gimme the code

return userRepository.findByEmail(userDto.getEmail())
        //rethrow DB errors as BadRequestException, except user exists
        .onErrorResume(t -> Mono.error(new BadRequestException()))
        //throwing in map is converted to onError signal
        .map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
        //switchIfEmpty to replace an empty source with a different Mono
        .switchIfEmpty(createUser(userDto))
        //at this point, map can only receive an onNext from the createUser
        .map(u -> new ResponseEntity(u, HttpStatus.CREATED))
        ;

Upvotes: 4

Related Questions