Reputation: 707
I want to do the equivalent of something like below, where the result from the previous call is used in the next call to the same service using Project Reactor.
Message current;
Message next;
for each (Step step in steps)
{
current = new Message(step, next);
next = execute(current);
}
This is what I am trying to do using reactor:
For each "step" (in a flux)
a. Creating a message for that step and the last result (null to start).
b. Invoke the service using the message and get the result (mono).
c. Set the last message to this result so it can be used in 1a.
Take the very last result
My shoddy attempt at this so far looks like:
return fromIterable(request.getPipeline())
.map(s -> PipelineMessage.builder()
.client(client)
.step(s.getStep())
.build())
.flatMap(z -> {
return this.pipelineService.execute(z);
})
.last()
.map(m -> ok()
.entity(m.getPayload())
.type(m.getType())
.build());
Upvotes: 1
Views: 1692
Reputation: 15370
Not sure of your exact requirement. But I think reduce
function could help here.
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce("empty", (next, step) -> {
String current = message(step, next);
return execute(current);
})
.map(String::toUpperCase)
.subscribe(System.out::println);
Here message and execute are functions are like this.
String message(String step, String next){
return "message[" + step + ":" + next + "]";
}
String execute(String current){
return "execute(" + current + ")";
}
The final output would be the last executed message.
EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])
The initial step can not be null
here. Instead you could use an empty step object and treat it as null
.
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
Flux<String> current = message(step, next);
return current.flatMap(this::execute);
})
.flatMapMany(a -> a)
.subscribe(System.out::println);
Flux<String> message(String step, Flux<String> next){
return next.map(v -> "message(" + v + ":" + step + ")");
}
Mono<String> execute(String current){
return Mono.just("execute(" + current + ")");
}
Output:
execute(message(execute(message(execute(message(empty:a)):b)):c))
Upvotes: 1