Mulgard
Mulgard

Reputation: 10609

How to map a flux with mono?

I have these two requests:

Flux<ProductID> getProductIds() {
    return this.webClient.get()
            .uri(PRODUCT_ID_URI)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .retrieve()
            .bodyToFlux(ProductID.class);
}

Mono<Product> getProduct(String id) {
    return this.itemServiceWebClient.get()
            .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
                    .build(id))
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}

And with these I want to do the following:

Flux<Product> getProducts() {
    return Flux.create(sink -> this.gateway.getProductIds()
            .doOnComplete(() -> {
                log.info("PRODUCTS COMPLETE");

                sink.complete();
            })
            .flatMap(productId -> this.getProduct(productId.getID()))
            .subscribe(product -> {
                log.info("NEW PRODUCT: " + product);

                sink.next(product);
            }));
}

When I call this I get the following output:

PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....

Ofcourse the stream is closing before the results are actually there because the the async mono mapping. How can I keep this non-blocking but also making sure that the results arrive before the on complete is called?

Upvotes: 1

Views: 4613

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59231

Assuming getProducts is a controller method and that you want to add those products in your model to be rendered in a view template, you could solve this problem like this:

@GetMapping("/products")
public String getProducts(Model model) {

    Flux<Product> products = this.gateway.getProductIds()
            .flatMap(productId -> this.getProduct(productId.getID()));
    model.put("products", products);
    // return the view name
    return "showProducts";
}

Upvotes: 2

Related Questions