Reputation: 2239
I wrote this code to spin off a large number of WebClients (limited by reactor.ipc.netty.workerCount
), start the Mono immediately, and wait for the all Monos to complete:
List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
for (MetricConfig metricConfig : metricConfigs) {
try {
monos.add(extractMetrics.queryMetricData(metricConfig)
.doOnSuccess(result -> {
metricDataList.addAll(result);
})
.cache());
} catch (Exception e) {
}
}
Mono.when(monos)
.doFinally(onFinally -> {
Map<String, Date> latestMap;
try {
latestMap = extractInsights.queryInsights();
Transform transform = new Transform(copierConfig.getEventType());
ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
} catch (Exception e) {
log.error("copy: mono: when: {}", e.getMessage(), e);
}
})
.block();
It 'works', that is the results are as expected.
Two questions:
cache()
result in the when
waiting for all Monos to complete?Thanks.
Upvotes: 0
Views: 5488
Reputation: 59086
You should try as much as possible to:
doOn*
operators for something other than side-effects (like logging)Your code could look a bit more like
List<MetricConfig> metricConfigs = //...
Mono<List<MetricDataModel>> data = Flux.fromIterable(metricConfigs)
.flatMap(config -> extractMetrics.queryMetricData(config))
.collectList();
Also, the cache()
operator does not wait the completion of the stream (that's actually then()
's job).
Upvotes: 2