Andrew Kuleshov
Andrew Kuleshov

Reputation: 199

How to build reactive architecture that will avoid nested Flux blocks (Flux<Flux <T>>)?

I am trying to build an application A (like an adaptor) that will:

1) Receive POST requests with some key (JSON format)

2) It should modify that key somehow and create POST request to another system B.

3) Application A should parse the response from application B and modify that response.

4) After that my application A should answer to the initial POST request.

@RestController
@RequestMapping("/A")
public class Controller {
    @ResponseStatus(HttpStatus.OK)
    @PostMapping(value = "B", consumes = APPLICATION_JSON_VALUE)
    // to return nested Flux is a bad idea here
    private Flux<Flux<Map<String, ResultClass>>> testUpdAcc(@RequestBody Flux<Map<String, SomeClass>> keys) {
        return someMethod(keys);
    }

    // the problem comes here when I will get Flux<Flux<T>> in the return
    public Flux<Flux<Map<String, ResultClass>>> someMethod(Flux<Map<String, SomeClass>> keysFlux) {
        return keysFlux.map(keysMap -> {
                                // do something with keys and create URL
                                // also will batch keys here
                                <...>

                                // for each batch of keys:
                                WebClient.create(hostAndPort)
                                .method(HttpMethod.POST)
                                .uri(url)
                                .body(BodyInserters.fromObject(body))
                                .header(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
                                .accept(MediaType.APPLICATION_JSON)
                                .retrieve()
                                .bodyToMono(schema) // response will be parsed into some schema here
                                .retryWhen (// will make a retry mechanism here)

                                // ===== will join all Mono batches into single Flux
                                Flux.concat(...);
                                }
                             );
    }

}

Of course this can be fixed by not reading keysFlux as Flux and read that as Map. But that should make everything less reactive, no? :)

    @ResponseStatus(HttpStatus.OK)
    @PostMapping(value = "B", consumes = APPLICATION_JSON_VALUE)
    // to return nested Flux is a bad idea here
    private Flux<Map<String, ResultClass>> testUpdAcc(@RequestBody Map<String, SomeClass> keys) {
        return someMethod(keys);
    }

Also I have tried to use block()/blockFirst() in the last moment before returning the request, but I have got an error:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor...

Thank you for your ideas!

Upvotes: 0

Views: 784

Answers (2)

Vimalesh Jeyavelmani
Vimalesh Jeyavelmani

Reputation: 19

Try to zip all the flux like this

Flux.zip(flux1,flux2)

It will create Tuple2 so that you can do flatMap

Thanks,
Vimalesh

Upvotes: 0

Andrew Kuleshov
Andrew Kuleshov

Reputation: 199

Forget about my question - we can easily use "flatMap" instead of "map". That will solve a problem with Flux inside Flux.

Upvotes: 1

Related Questions