Reputation: 377
I'm using Flux.interval
for running scheduled tasks
Flux
.interval(Duration.ofSeconds(30))
.doOnNext(duration -> log.info("Process has been started"))
.flatMap(duration -> customService.process())
.onErrorResume(throwable -> Flux.empty())
.subscribe();
The thing here that if an exception occurs in any place, onErrorResume
has been called, but scheduler stops working - it doesn't call customService.process()
anymore
I need this to continue calling method every 30 seconds even if there're any errors in any place. Can anybody help me with that?
Upvotes: 1
Views: 2359
Reputation: 5974
In your case .onErrorResume(throwable -> Flux.empty())
would just complete the flow without error signal but the original Flux
will be canceled.
You have several options:
Handle error in place
Handle error on the process
level. In this case error will be "ignored" and initial Flux
will continue. Technically this is the same as wrapping process
method in try-catch
and ignore error in imperative programming.
Flux.interval(Duration.ofSeconds(30))
.doOnNext(i -> log.info("Process has been started: {}", i))
.flatMap(i ->
process(i)
.doOnError(e -> log.error("Error: {}", e.getMessage()))
.onErrorResume(e -> Mono.empty())
);
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 4
Process has been started: 5
Process has been started: 6
Process has been started: 7
Retry
You can use .retry()
operator at the end of the sequence to retry any error or use .retryWhen
if you need more granular control.
Flux.interval(Duration.ofSeconds(30))
.doOnNext(i -> log.info("Process has been started: {}", i))
.flatMap(i ->
process(i)
)
.retry();
Note that in this case every error will trigger re-subscription to the Flux.interval
and the sequence will start over.
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Upvotes: 4