Reputation: 23145
I have the following reactive chain defined:
Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
return aEnricher.getAById(idList)
.zipWith(bEnricher.getBByLookupId(lookupIds))
.zipWith(cEnricher.getCByLookupId(lookupIds))
.map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}
Function Signatures:
Flux<A> getAById(List<String> idList)
Flux<B> getBByLookupId(List<String> lookupIds)
Flux<C> getCByLookupId(List<String> lookupIds)
lookupId
is received as part of object A
from the first api call.
This is called here:
combinedEnricher.enrich(events).subscribe(this::processTuple);
My question is this. I have multiple different enrichers to be added to zipWith
. As per documentation, zipWith
will complete when one of the publisher completes. But in my case, different enrichers will emit different number of Flux and I need to process all of them.
How can I achieve this? Since the Flux types are different, I cannot use merge here
EDIT
Option A
aEnricher.getAById(idList).buffer(10).subscribe( lookupIds -> {
bEnricher.getBByLookupId(lookupIds).subscribe();
cEnricher.getCByLookupId(lookupIds).subscribe();
}
Mono<Void> getBByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceB.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Mono<Void> getCByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceC.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Option B
aEnricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
Mono.just(lookupIds),
aEnricher.getBByLookupId(lookupIds).collectList(),
aEnricher.getCByLookupId(lookupIds).collectList()
)
)
.map(convertToTuple3)
.map(this::sendToKafka)
Upvotes: 0
Views: 1131
Reputation: 5982
Based on the update question I would propose the following
Flux<Void> enrich(List<String> idList) {
return enricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
sendToKafka(Flux.fromIterable(lookupIds)),
sendToKafka(enricher.getBByLookupId(lookupIds)),
sendToKafka(enricher.getCByLookupId(lookupIds))
).then()
);
}
This logic assumes that getXByLookupId
returns Flux<T>
.
Having in mind you are using Reactor Kafka, sendToKafka
could look like. Probably you would need to have sendToKafka
for every specific type.
private <T> Mono<Void> sendToKafka(Flux<T> data) {
return kafkaSender.createOutbound()
.send(data)
.then();
}
Upvotes: 1