Spliterash
Spliterash

Reputation: 331

Close the flux immediately if the backpressureBuffer is overflow

I have a multicast sink with a directBestEffort strategy in which I put various events.

I would like to create a thread from it that buffers N number of events, and in case of overflow, immediately closes with an error, for example if the receiver is slow to process the events

The problem is that onBackpressureBuffer(4, { println(“Overflow on $it”) }, BufferOverflowStrategy.ERROR) processes the buffer first, and then throws an error. I would like the error to be thrown at the moment of overflow on $it printing

Here is a small demo with logs

private val sink: Sinks.Many<Int> = Sinks
    .many()
    .multicast()
    .directBestEffort()

fun createBufferFlux(): Flux<Int> {
    return sink.asFlux()
        .onBackpressureBuffer(4, { println("Overflow on $it") }, BufferOverflowStrategy.ERROR)
}

fun emitTestEvents() {
    (1..20).forEach {
        sink.tryEmitNext(it)
        Thread.sleep(50)
    }
}

fun main() {
    val completable = CompletableFuture<Unit>()

    createBufferFlux()
        .delayElements(Duration.ofMillis(100))
        .subscribe(
            { println("Slow flux received: $it") },
            { println("Slow flux error: $it");completable.complete(Unit) },
            { println("Slow flux completed");completable.complete(Unit) }
        )
    ForkJoinPool.commonPool().execute {
        emitTestEvents()
    }

    completable.join()
}
Slow flux received: 1
Slow flux received: 2
Slow flux received: 3
Slow flux received: 4
Slow flux received: 5
Overflow on 11 # im want close slow flux here
Slow flux received: 6
Slow flux received: 7
Slow flux received: 8
Slow flux received: 9
# but error only occured, when slow flux process buffer
Slow flux error: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

Also, please note that I would not want to use

Sinks.many()
     .multicast()
     .onBackpressureBuffer<Int>(4, false)

Because in that case, the buffer would be global to the sink

Upvotes: 0

Views: 31

Answers (0)

Related Questions