DBS
DBS

Reputation: 814

Project Reactor - How to handle OverflowException from Flux.interval?

I am building a spring boot application using Spring Webflux and I want to make the application fully non-blocking. The application itself has some REST endpoints and a batch job that needs to run every few seconds. For the batch job, I am trying to Flux.interval(Duration.ofMillis(1000)) to generate long values which I ignore and run my scheduled job.

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething())
    .subscribe();

However after some time I get the error

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)

Can someone tell me how to overcome this issue ?

Upvotes: 8

Views: 7007

Answers (1)

Martin Tarjányi
Martin Tarjányi

Reputation: 9947

The cause of the issue is most likely that the doSomething() operation takes longer than the specified Flux interval, which means that after some time doSomething jobs overlap each other and backpressure kicks in. Since Flux.interval is a hot source (meaning it doesn't emit signals on demand) and flatMap has a default limit of concurrency (256), the operator gets overwhelmed and this results in an OverflowException.

Based on your requirements, there a couple of potential solutions for this problem:

1. Ignore the overflow error and drop the signals which would overflow

This means sometimes, we skip a second and don't schedule a job for the interval if we already have a lot (256) in progress.

Flux.interval(Duration.ofMillis(1000))
    .onBackpressureDrop()
    .flatMap(ignore -> doSomething())

2. Set flatMap concurrency to a higher value

This can still result in OverflowException after some time, but it delays the problem (probably not the best solution).

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething(), Integer.MAX_VALUE)

3. Don't let jobs overlap each other

We switch from a hot source to a cold source which eliminates the possibility of overflow. However, we lose the guarantee of scheduling an event every second. Instead, they will be scheduled on demand when previous job finished and at least 1 second elapsed.

Mono.just(1).repeat() // infinite Flux with backpressure
    .delayElements(Duration.ofMillis(1000))
    .concatMap(ignore -> doSomething())

You can also combine this solution with the previous one if you are fine with overlapping jobs and define a reasonable concurrency level in the flatMap call.

Upvotes: 23

Related Questions