Mike Summers
Mike Summers

Reputation: 2239

Waiting for running Reactor Mono instances to complete

I wrote this code to spin off a large number of WebClients (limited by reactor.ipc.netty.workerCount), start the Mono immediately, and wait for the all Monos to complete:

   List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
   for (MetricConfig metricConfig : metricConfigs) {
        try {
            monos.add(extractMetrics.queryMetricData(metricConfig)
                  .doOnSuccess(result -> {
                      metricDataList.addAll(result);
                  })
                  .cache());
        } catch (Exception e) {
        }
    }

    Mono.when(monos)
          .doFinally(onFinally -> {
              Map<String, Date> latestMap;
              try {
                  latestMap = extractInsights.queryInsights();
                  Transform transform = new Transform(copierConfig.getEventType());
                  ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
              } catch (Exception e) {
                  log.error("copy: mono: when: {}", e.getMessage(), e);
              }
          })
          .block();

It 'works', that is the results are as expected.

Two questions:

  1. Is this correct? Does cache() result in the when waiting for all Monos to complete?
  2. Is it efficient? Is there a way to make this faster?

Thanks.

Upvotes: 0

Views: 5488

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59086

You should try as much as possible to:

  • use Reactor operators and compose a single reactive chain
  • avoid using doOn* operators for something other than side-effects (like logging)
  • avoid shared state

Your code could look a bit more like

List<MetricConfig> metricConfigs = //...
Mono<List<MetricDataModel>> data = Flux.fromIterable(metricConfigs)
    .flatMap(config -> extractMetrics.queryMetricData(config))
    .collectList();

Also, the cache() operator does not wait the completion of the stream (that's actually then()'s job).

Upvotes: 2

Related Questions