Reputation: 393
I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.
I am currently using Flux.fromIterable and using runOn operator
Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)
.sequential()
.subscribe();
I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.
Upvotes: 10
Views: 16356
Reputation: 3731
Because of .parallel().runOn(...)
usage you can't use onErrorContinue
as below:
.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)
but you might be able to use it like this:
.parallel().runOn(...)
.flatMap(request -> someRemoteCall
.onErrorContinue((t, o) -> log.error("Skipped error: {}", t.getMessage()))
)
provided that someRemoteCall
is a Mono
or Flux
not itself run on .parallel().runOn(...)
rails.
But when you don't have a someRemoteCall
you can do the trick below (see NOT_MONO_AND_NOT_FLUX
) to ignore the unsafe processing run on .parallel().runOn(...)
rails:
Optional<List<String>> foundImageNames =
Flux.fromStream(this.fileStoreService.walk(path))
.parallel(cpus, cpus)
.runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)
.flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
.just(NOT_MONO_AND_NOT_FLUX)
.map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
.onErrorContinue(FileNotFoundException.class,
(t, o) -> log.error("File missing:\n{}", t.getMessage()))
)
.collectSortedList(Comparator.naturalOrder())
.blockOptional();
Upvotes: 3
Reputation: 72379
There's three possible ways I generally use to achieve this:
flatMap()
, the second of which is a mapperOnError
-eg. .flatMap(request -> someRemoteCall(), x->Mono.empty(), null)
;onErrorResume(x -> Mono.empty())
as a separate call to ignore any error;.onErrorResume(MyException.class, x -> Mono.empty()))
to just ignore errors of a certain type.The second is what I tend to use by default, as I find that clearest.
Upvotes: 4
Reputation: 9987
There are delay error operators in Reactor. You could write your code as follows:
Flux.fromIterable(actions)
.flatMapDelayError(request -> someRemoteCall(request).subscribeOn(Schedulers.elastic()), 256, 32)
.doOnNext(System.out::println)
.subscribe();
Note that this will still fail your flux in case of any inside publisher emits error, however, it will wait for all inner publishers to finish before doing that.
These operators also require to specify the concurrency and prefetch parameters. In the example I've set them to their default values which is used in regular flatMap calls.
Upvotes: 0
Reputation: 38152
I'm still in the process of learning WebFlux and Reactor, but try one of the onErrorContinue
directly after flatMap
(REST call) to drop (and potentially log) errors.
Upvotes: 0