Kasiaras Dimitrios
Kasiaras Dimitrios

Reputation: 261

Webflux Controller 'return Object instead of Mono'

Hello I am new to Webflux I follow a tutorial for building reactive microservices. In my project I faced the following problem.

I want to create a crud api for the product service and the following is the Create method

@Override
public Product createProduct(Product product) {

    Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
             productEntity.ifPresent((prod -> {
                  throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
              }));

    ProductEntity entity = mapper.apiToEntity(product);
    Mono<Product> newProduct = repository.save(entity)
        .log()
        .map(mapper::entityToApi);

    return newProduct.block();
}

The problem is that when I call this method from postman I get the error "block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3" but when I use a StreamListener this call works ok. The stream Listener gets events from a rabbit-mq channel

StreamListener

@EnableBinding(Sink.class)
public class MessageProcessor {

    private final ProductService productService;

    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

       
        switch (event.getEventType()) {

            case CREATE:
                Product product = event.getData();
                LOG.info("Create product with ID: {}", product.getProductId());
                productService.createProduct(product);
                break;

   
            default:
                String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
                LOG.warn(errorMessage);
                throw new EventProcessingException(errorMessage);
        }
    }
}

I Have two questions.

  1. Why this works with The StreamListener and not with a simple request?
  2. Is there a proper way in webflux to return the object of the Mono or we always have to return a Mono?

Upvotes: 1

Views: 2623

Answers (1)

Joseph Berry
Joseph Berry

Reputation: 188

Your create method would want to look more like this and you would want to return a Mono<Product> from your controller rather than the object alone.

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .switchIfEmpty(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }

As @Thomas commented you are breaking some of the fundamentals of reactive coding and not getting the benefits by using block() and should read up on it more. For example the reactive mongo repository you are using will be returning a Mono which has its own methods for handling if it is empty without needing to use an Optional as shown above.

EDIT to map to error if entity already exists otherwise save

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> exists)
        .flatMap(exists -> Mono.error(new Exception("my exception")))
        .then(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }

Upvotes: 2

Related Questions