cppcoder
cppcoder

Reputation: 23145

How to combine Flux from multiple publishers and process all of them

I have the following reactive chain defined:

Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
    return aEnricher.getAById(idList)
        .zipWith(bEnricher.getBByLookupId(lookupIds))
        .zipWith(cEnricher.getCByLookupId(lookupIds))
        .map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}

Function Signatures:

Flux<A> getAById(List<String> idList)
Flux<B> getBByLookupId(List<String> lookupIds)
Flux<C> getCByLookupId(List<String> lookupIds)

lookupId is received as part of object A from the first api call.

This is called here:

combinedEnricher.enrich(events).subscribe(this::processTuple);

My question is this. I have multiple different enrichers to be added to zipWith. As per documentation, zipWith will complete when one of the publisher completes. But in my case, different enrichers will emit different number of Flux and I need to process all of them.

How can I achieve this? Since the Flux types are different, I cannot use merge here

EDIT

Option A

aEnricher.getAById(idList).buffer(10).subscribe( lookupIds -> {
    bEnricher.getBByLookupId(lookupIds).subscribe();
    cEnricher.getCByLookupId(lookupIds).subscribe();
}

Mono<Void> getBByLookupId(List<String> lookupIds) {
    Flux.just(lookupIds)
    .flatMap(lookupId -> serviceB.callApi(lookupId))
    .map(this::convertToAnotherObject)
    .doOnNext(this::sendToKafka)
    .then();
}

Mono<Void> getCByLookupId(List<String> lookupIds) {
    Flux.just(lookupIds)
    .flatMap(lookupId -> serviceC.callApi(lookupId))
    .map(this::convertToAnotherObject)
    .doOnNext(this::sendToKafka)
    .then();
}

Option B

aEnricher.getAById(idList)
            .buffer(10)
            .flatMap(lookupIds -> 
                    Mono.zip(
                            Mono.just(lookupIds),
                            aEnricher.getBByLookupId(lookupIds).collectList(), 
                            aEnricher.getCByLookupId(lookupIds).collectList()
                    )
            )
            .map(convertToTuple3)
            .map(this::sendToKafka)

Upvotes: 0

Views: 1131

Answers (1)

Alex
Alex

Reputation: 5982

Based on the update question I would propose the following

Flux<Void> enrich(List<String> idList) {
    return enricher.getAById(idList)
            .buffer(10)
            .flatMap(lookupIds ->
                    Mono.zip(
                            sendToKafka(Flux.fromIterable(lookupIds)),
                            sendToKafka(enricher.getBByLookupId(lookupIds)),
                            sendToKafka(enricher.getCByLookupId(lookupIds))
                    ).then()
            );
}

This logic assumes that getXByLookupId returns Flux<T>.

Having in mind you are using Reactor Kafka, sendToKafka could look like. Probably you would need to have sendToKafka for every specific type.

private <T> Mono<Void> sendToKafka(Flux<T> data) {
        return kafkaSender.createOutbound()
                .send(data)
                .then();
    }

Upvotes: 1

Related Questions