Reputation: 121
I am new to reactor and reactive programming and I am trying to solve the below case.
I receive a flux of objects from Kafka topic and for each record in flux, I need to call 2 services and validate the object.
public void consume(Flux<Data> flux)
{
flux.map(data->callRESTService1(data)).map(...<I need the data once again here to call rest service 2>
}
Right now i am using the below style to achieve this but is there any better/right way to do this?
public void consume(Flux<Data> flux)
{
flux.subscribe(data->handleData(data));
}
public void handleData(data)
{
Flux.concat(callRestService1(data),callRestService2(data)).reduce(data,reduce());
}
Also, If one of service is down, i need to propagate error on the listener so that the message is not acknowledged but on the other case if validation fails , need to publish a message to another topic.
Upvotes: 0
Views: 1812
Reputation: 1712
you can try to zip in flatmap like this
Flux<Strings> flux = Flux.just("d");
flux.flatMap(strings -> {
return Flux.zip(callRestService1(strings).onErrorResume(throwable -> dosomeshits(throwable)),callRestService2(strings).onErrorResume(throwable -> dosomeshits(throwable)),(t1, t2) -> t1)
})
Upvotes: 0
Reputation: 28351
The fact that you need your original element for both path and that each path has a different way of dealing with errors is a good indicator that you probably want a flatMap
:
Flux<Data> source; //= ...
return source.flatMap(value -> {
Mono<IgnoreMe1> service1 = callRestService1(value);
Mono<IgnoreMe2> service2 = callRestService2(value)
.onErrorResume(e -> postErrorToTopic(e, value)); //might need some type massaging, eg. if the post to topic method returns a `Mono<Void>`
//wait for the two to complete, propagate their errors if any, else return original value
return Mono.when(service1, service2)
.thenReturn(value);
}
Upvotes: 1