Reputation: 30115
I feel like I'm missing some simple fundamental nuance but for some reason Mono.delay()
is not working for me. I have Mono that makes http request that could be throttled. I need to wait provided time to retry. Here is how it looks now
internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.from {
it.map { rs ->
val ex = rs.failure()
if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
println(" Waited. ${System.currentTimeMillis()}")
this
}))
} else
Mono.error(rs.failure())
}
})
}
Upvotes: 1
Views: 376
Reputation: 28301
you are using map
, which results in a Flux<Mono<?>>
being returned to the operator for retry control. from the operator's perspective (Flux<?>
), any onNext
means "you should retry". whether it is onNext("example")
or onNext(Mono.error(err))
doesn't matter.
instead of using map
, use concatMap
. The Mono
that you produce in your function will correctly result in a Flux<?>
in which the "delay" branch of the if
produces (delayed) onNext
while the other branch produces onError
.
Upvotes: 1
Reputation: 586
You can use built in retry builders
I suggest you to use Retry.fixedDelay()
which allow you to define a max retry and a delay between each try. When max retry is reached you'll get a Mono.error()
internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
.doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
.doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
.onRetryExhaustedThrow { _, rs -> rs.failure()})
}
Upvotes: 1