Reputation: 331
I have a multicast sink with a directBestEffort
strategy in which I put various events.
I would like to create a thread from it that buffers N number of events, and in case of overflow, immediately closes with an error, for example if the receiver is slow to process the events
The problem is that onBackpressureBuffer(4, { println(“Overflow on $it”) }, BufferOverflowStrategy.ERROR)
processes the buffer first, and then throws an error. I would like the error to be thrown at the moment of overflow on $it printing
Here is a small demo with logs
private val sink: Sinks.Many<Int> = Sinks
.many()
.multicast()
.directBestEffort()
fun createBufferFlux(): Flux<Int> {
return sink.asFlux()
.onBackpressureBuffer(4, { println("Overflow on $it") }, BufferOverflowStrategy.ERROR)
}
fun emitTestEvents() {
(1..20).forEach {
sink.tryEmitNext(it)
Thread.sleep(50)
}
}
fun main() {
val completable = CompletableFuture<Unit>()
createBufferFlux()
.delayElements(Duration.ofMillis(100))
.subscribe(
{ println("Slow flux received: $it") },
{ println("Slow flux error: $it");completable.complete(Unit) },
{ println("Slow flux completed");completable.complete(Unit) }
)
ForkJoinPool.commonPool().execute {
emitTestEvents()
}
completable.join()
}
Slow flux received: 1
Slow flux received: 2
Slow flux received: 3
Slow flux received: 4
Slow flux received: 5
Overflow on 11 # im want close slow flux here
Slow flux received: 6
Slow flux received: 7
Slow flux received: 8
Slow flux received: 9
# but error only occured, when slow flux process buffer
Slow flux error: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Also, please note that I would not want to use
Sinks.many()
.multicast()
.onBackpressureBuffer<Int>(4, false)
Because in that case, the buffer would be global to the sink
Upvotes: 0
Views: 31