rajesh kumar
rajesh kumar

Reputation: 121

Pass through object downstream in the functional pipeline in Reactor

I am new to reactor and reactive programming and I am trying to solve the below case.

I receive a flux of objects from Kafka topic and for each record in flux, I need to call 2 services and validate the object.

public void consume(Flux<Data> flux)
{
flux.map(data->callRESTService1(data)).map(...<I need the data once again here to call rest service 2>
}

Right now i am using the below style to achieve this but is there any better/right way to do this?

public void consume(Flux<Data> flux)
{
   flux.subscribe(data->handleData(data));
}


 public void handleData(data)
    {
 Flux.concat(callRestService1(data),callRestService2(data)).reduce(data,reduce());
    }

Also, If one of service is down, i need to propagate error on the listener so that the message is not acknowledged but on the other case if validation fails , need to publish a message to another topic.

Upvotes: 0

Views: 1812

Answers (2)

Ricard Kollcaku
Ricard Kollcaku

Reputation: 1712

you can try to zip in flatmap like this

Flux<Strings>  flux = Flux.just("d");
flux.flatMap(strings -> {
return Flux.zip(callRestService1(strings).onErrorResume(throwable -> dosomeshits(throwable)),callRestService2(strings).onErrorResume(throwable -> dosomeshits(throwable)),(t1, t2) -> t1)

})

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28351

The fact that you need your original element for both path and that each path has a different way of dealing with errors is a good indicator that you probably want a flatMap:

Flux<Data> source; //= ...
return source.flatMap(value -> {
    Mono<IgnoreMe1> service1 = callRestService1(value);
    Mono<IgnoreMe2> service2 = callRestService2(value)
        .onErrorResume(e -> postErrorToTopic(e, value)); //might need some type massaging, eg. if the post to topic method returns a `Mono<Void>`

    //wait for the two to complete, propagate their errors if any, else return original value
    return Mono.when(service1, service2)
       .thenReturn(value);
}

Upvotes: 1

Related Questions