Reputation: 814
I am building a spring boot application using Spring Webflux and I want to make the application fully non-blocking. The application itself has some REST endpoints and a batch job that needs to run every few seconds. For the batch job, I am trying to Flux.interval(Duration.ofMillis(1000))
to generate long values which I ignore and run my scheduled job.
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething())
.subscribe();
However after some time I get the error
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Can someone tell me how to overcome this issue ?
Upvotes: 8
Views: 7007
Reputation: 9947
The cause of the issue is most likely that the doSomething()
operation takes longer than the specified Flux interval, which means that after some time doSomething
jobs overlap each other and backpressure kicks in. Since Flux.interval
is a hot source (meaning it doesn't emit signals on demand) and flatMap
has a default limit of concurrency (256), the operator gets overwhelmed and this results in an OverflowException
.
Based on your requirements, there a couple of potential solutions for this problem:
This means sometimes, we skip a second and don't schedule a job for the interval if we already have a lot (256) in progress.
Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())
flatMap
concurrency to a higher valueThis can still result in OverflowException after some time, but it delays the problem (probably not the best solution).
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething(), Integer.MAX_VALUE)
We switch from a hot source to a cold source which eliminates the possibility of overflow. However, we lose the guarantee of scheduling an event every second. Instead, they will be scheduled on demand when previous job finished and at least 1 second elapsed.
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())
You can also combine this solution with the previous one if you are fine with overlapping jobs and define a reasonable concurrency level in the flatMap
call.
Upvotes: 23