naval jain
naval jain

Reputation: 393

Dealing with parallel flux in Reactor

I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.

I am currently using Flux.fromIterable and using runOn operator

Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)     
.sequential()
.subscribe();

I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.

Upvotes: 10

Views: 16356

Answers (4)

Adrian
Adrian

Reputation: 3731

Because of .parallel().runOn(...) usage you can't use onErrorContinue as below:

.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)

but you might be able to use it like this:

.parallel().runOn(...)
.flatMap(request -> someRemoteCall
        .onErrorContinue((t, o) -> log.error("Skipped error: {}", t.getMessage()))
)

provided that someRemoteCall is a Mono or Flux not itself run on .parallel().runOn(...) rails.

But when you don't have a someRemoteCall you can do the trick below (see NOT_MONO_AND_NOT_FLUX) to ignore the unsafe processing run on .parallel().runOn(...) rails:

Optional<List<String>> foundImageNames = 
    Flux.fromStream(this.fileStoreService.walk(path))
        .parallel(cpus, cpus)
        .runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)

        .flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
            .just(NOT_MONO_AND_NOT_FLUX)
            .map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
            .onErrorContinue(FileNotFoundException.class,
                (t, o) -> log.error("File missing:\n{}", t.getMessage()))
        )

        .collectSortedList(Comparator.naturalOrder())
        .blockOptional();

Upvotes: 3

Michael Berry
Michael Berry

Reputation: 72379

There's three possible ways I generally use to achieve this:

  • Use the 3 argument version of flatMap(), the second of which is a mapperOnError -eg. .flatMap(request -> someRemoteCall(), x->Mono.empty(), null);
  • Use onErrorResume(x -> Mono.empty()) as a separate call to ignore any error;
  • Use .onErrorResume(MyException.class, x -> Mono.empty())) to just ignore errors of a certain type.

The second is what I tend to use by default, as I find that clearest.

Upvotes: 4

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9987

There are delay error operators in Reactor. You could write your code as follows:

Flux.fromIterable(actions)
    .flatMapDelayError(request -> someRemoteCall(request).subscribeOn(Schedulers.elastic()), 256, 32)
    .doOnNext(System.out::println)
    .subscribe();

Note that this will still fail your flux in case of any inside publisher emits error, however, it will wait for all inner publishers to finish before doing that.

These operators also require to specify the concurrency and prefetch parameters. In the example I've set them to their default values which is used in regular flatMap calls.

Upvotes: 0

Puce
Puce

Reputation: 38152

I'm still in the process of learning WebFlux and Reactor, but try one of the onErrorContinue directly after flatMap (REST call) to drop (and potentially log) errors.

Upvotes: 0

Related Questions