KJQ
KJQ

Reputation: 707

Iterate a Flux, execute a Mono inside, use the result in the next step

I want to do the equivalent of something like below, where the result from the previous call is used in the next call to the same service using Project Reactor.

Message current;
Message next;
for each (Step step in steps)
{
    current = new Message(step, next);
    next = execute(current);
}

This is what I am trying to do using reactor:

  1. For each "step" (in a flux)

    a. Creating a message for that step and the last result (null to start).

    b. Invoke the service using the message and get the result (mono).

    c. Set the last message to this result so it can be used in 1a.

  2. Take the very last result

My shoddy attempt at this so far looks like:

return fromIterable(request.getPipeline())
    .map(s -> PipelineMessage.builder()
        .client(client)
        .step(s.getStep())
        .build())
    .flatMap(z -> {
        return this.pipelineService.execute(z);
    })
    .last()
    .map(m -> ok()
        .entity(m.getPayload())
        .type(m.getType())
        .build());

Upvotes: 1

Views: 1692

Answers (1)

vins
vins

Reputation: 15370

Not sure of your exact requirement. But I think reduce function could help here.

Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));

stringFlux
        .reduce("empty", (next, step) -> {
            String current = message(step, next);
            return execute(current);
        })
        .map(String::toUpperCase)
        .subscribe(System.out::println);

Here message and execute are functions are like this.

String message(String step, String next){
    return "message[" + step + ":" + next + "]";
}

String execute(String current){
    return "execute(" + current + ")";
}

The final output would be the last executed message.

EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])

The initial step can not be null here. Instead you could use an empty step object and treat it as null.


Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));

stringFlux
        .reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
            Flux<String> current = message(step, next);
            return current.flatMap(this::execute);
        })
        .flatMapMany(a -> a)
        .subscribe(System.out::println);


Flux<String> message(String step, Flux<String> next){
    return next.map(v -> "message(" + v + ":" + step + ")");
}

Mono<String> execute(String current){
    return Mono.just("execute(" + current + ")");
}

Output:

execute(message(execute(message(execute(message(empty:a)):b)):c))

Upvotes: 1

Related Questions