expert
expert

Reputation: 30115

How do I implement variable backoff in Mono.retry?

I feel like I'm missing some simple fundamental nuance but for some reason Mono.delay() is not working for me. I have Mono that makes http request that could be throttled. I need to wait provided time to retry. Here is how it looks now

internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
    // `this` is instance of Mono<T>
    return this.retryWhen(Retry.from {
        it.map { rs ->
            val ex = rs.failure()
            if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
                println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
                Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
                    println("   Waited. ${System.currentTimeMillis()}")
                    this
                }))
            } else
                Mono.error(rs.failure())
        }
    })
}

Upvotes: 1

Views: 376

Answers (2)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

you are using map, which results in a Flux<Mono<?>> being returned to the operator for retry control. from the operator's perspective (Flux<?>), any onNext means "you should retry". whether it is onNext("example") or onNext(Mono.error(err)) doesn't matter.

instead of using map, use concatMap. The Mono that you produce in your function will correctly result in a Flux<?> in which the "delay" branch of the if produces (delayed) onNext while the other branch produces onError.

Upvotes: 1

Ckram
Ckram

Reputation: 586

You can use built in retry builders I suggest you to use Retry.fixedDelay() which allow you to define a max retry and a delay between each try. When max retry is reached you'll get a Mono.error()

    internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
        // `this` is instance of Mono<T>
        return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
                .doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
                .doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
                .onRetryExhaustedThrow { _, rs ->  rs.failure()})
    }

Upvotes: 1

Related Questions