Alexey
Alexey

Reputation: 377

Flux interval stops execution when error occurs

I'm using Flux.interval for running scheduled tasks

 Flux
        .interval(Duration.ofSeconds(30))
        .doOnNext(duration -> log.info("Process has been started"))
        .flatMap(duration -> customService.process())
        .onErrorResume(throwable -> Flux.empty())
        .subscribe();

The thing here that if an exception occurs in any place, onErrorResume has been called, but scheduler stops working - it doesn't call customService.process() anymore

I need this to continue calling method every 30 seconds even if there're any errors in any place. Can anybody help me with that?

Upvotes: 1

Views: 2359

Answers (1)

Alex
Alex

Reputation: 5974

In your case .onErrorResume(throwable -> Flux.empty()) would just complete the flow without error signal but the original Flux will be canceled.

You have several options:

Handle error in place

Handle error on the process level. In this case error will be "ignored" and initial Flux will continue. Technically this is the same as wrapping process method in try-catch and ignore error in imperative programming.

Flux.interval(Duration.ofSeconds(30))
    .doOnNext(i -> log.info("Process has been started: {}", i))
    .flatMap(i ->
            process(i)
                    .doOnError(e -> log.error("Error: {}", e.getMessage()))
                    .onErrorResume(e -> Mono.empty())
    );
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 4
Process has been started: 5
Process has been started: 6
Process has been started: 7

Retry

You can use .retry() operator at the end of the sequence to retry any error or use .retryWhen if you need more granular control.

Flux.interval(Duration.ofSeconds(30))
        .doOnNext(i -> log.info("Process has been started: {}", i))
        .flatMap(i ->
                process(i)
        )
        .retry();

Note that in this case every error will trigger re-subscription to the Flux.interval and the sequence will start over.

Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3

Upvotes: 4

Related Questions