Sachin
Sachin

Reputation: 527

Spring Cloud Gateway - TimeOut Limiter for some reason slowing down / interacting with Rate Limiter

My Rate Limiter is a Global One shown below:

Rate Limiter

@Configuration
internal class RequestRateLimiterConfig(
    private val redisRateLimiter: RedisRateLimiter,
    private val defaultKeyResolver: KeyResolver
) {
    companion object {
        const val RATE_LIMITER_ID = "redis-rate-limiter"  // default key used by Spring Cloud Gateway
    }

    private val logger = LoggerFactory.getLogger(RequestRateLimiterConfig::class.java)

    @Bean
    fun requestRateLimiter(): GlobalFilter {

        return GlobalFilter { exchange, chain ->
            val keyMono = defaultKeyResolver.resolve(exchange)
            keyMono
                .flatMap { key ->
                    if (key.isNullOrEmpty()) {
                        // if key is null or empty, return error
                        logger.warn("Empty session ID detected. Sending error response.")
                        return@flatMap LocalExceptionHandlers.missingKey(exchange)
                    } else {
                        // if key is present, continue with rate limiting
                        logger.info("Resolved key: $key")
                        redisRateLimiter.isAllowed(RATE_LIMITER_ID, key)
                            .flatMap { response ->
                                if (!response.isAllowed) {
                                    logger.warn("Rate limit exceeded for key: $key")
                                    LocalExceptionHandlers.rateLimitExceeded(exchange)
                                } else {
                                    chain.filter(exchange)
                                }
                            }
                    }
                }.then()
        }
    }
}
/**
 * More Rate Limiter configuration
 */
@Configuration
internal class RedisRateLimiterConfig(
    private val sessionProperties: SessionProperties
) {

    private val logger = LoggerFactory.getLogger(RedisRateLimiterConfig::class.java)

    /**
     * Redis Rate Limited
     */
    @Bean
    fun redisRateLimiter(): RedisRateLimiter {
        return RedisRateLimiter(10, 20, 1)
    }

    /**
     * Default Key Resolver
     */
    @Bean
    fun defaultKeyResolver(): KeyResolver {
        return KeyResolver { exchange: ServerWebExchange ->
            val sessionId = exchange.request.cookies[sessionProperties.SESSION_COOKIE_NAME]?.first()?.value
            if (sessionId.isNullOrBlank()) {
                logger.warn("No session ID found in cookie.")
                Mono.just("")
            } else {
                logger.info("Resolved session ID for Rate Limiting: $sessionId")
                Mono.justOrEmpty(sessionId)
            }
        }
    }

}**

Load Test 1

When I do an Apache Benchmark Load test, I get the following back (as expected)

ab -v 4 -n 25 -c 1 -H 'Cookie: BFF-SESSIONID=BFF-3f3400dd34c22190cebfc18ff8eda96e7d248ea3ef10e52e8a95c4df41a37e14-162166622840154-47453-c244a0f4-9fcc-44ad-b6d8-fdd7aa965d5c' http://127.0.0.1:9090/bff/api/v1/resource/contracts/hello

Concurrency Level:      1
Time taken for tests:   2.429 seconds
Complete requests:      25
Failed requests:        0
Total transferred:      20100 bytes
HTML transferred:       325 bytes
Requests per second:    10.29 [#/sec] (mean)
Time per request:       97.149 [ms] (mean)
Time per request:       97.149 [ms] (mean, across all concurrent requests)
Transfer rate:          8.08 [Kbytes/sec] received

Time Limiter (No. 4 in the below)

However when I add a TimeLimiter to my router, I get very different results (I've isolated this being due to the TimeLimiter)

@Configuration
internal class RoutingConfig(
    private val serverProperties: ServerProperties,
    private val ignoreFilter: IgnoreFilterConfig,
) {

    private val logger = LoggerFactory.getLogger(RoutingConfig::class.java)

    @Bean
    fun routeLocator(
        builder: RouteLocatorBuilder,
        tokenRelayGatewayFilterFactory: TokenRelayGatewayFilterFactory,
        timeLimiterRegistry: TimeLimiterRegistry,
    ): RouteLocator {
        return builder.routes()

            // routing for Resource Server
            .route("resource-server") { r ->
                r.path("${serverProperties.resourceServerPrefix}/**")
                    .filters { f ->

                        // 1. Token relay filter first for authentication
                        f.filter(tokenRelayGatewayFilterFactory.apply())

                        // 2. Circuit Breaker before retry
//                        f.circuitBreaker { circuitBreakerConfig ->
//                            circuitBreakerConfig.setName("resourceServerCircuitBreaker")
//                            circuitBreakerConfig.setFallbackUri("forward:/fallback")
//                            circuitBreakerConfig.setStatusCodes(
//                                setOf(
//                                    HttpStatus.INTERNAL_SERVER_ERROR.value().toString(),            // 500
//                                    HttpStatus.NOT_IMPLEMENTED.value().toString(),                  // 501
//                                    HttpStatus.BAD_GATEWAY.value().toString(),                      // 502
//                                    HttpStatus.SERVICE_UNAVAILABLE.value().toString(),              // 503
//                                    HttpStatus.GATEWAY_TIMEOUT.value().toString(),                  // 504
//                                    HttpStatus.HTTP_VERSION_NOT_SUPPORTED.value().toString(),       // 505
//                                    HttpStatus.VARIANT_ALSO_NEGOTIATES.value().toString(),          // 506
//                                    HttpStatus.INSUFFICIENT_STORAGE.value().toString(),             // 507
//                                    HttpStatus.LOOP_DETECTED.value().toString(),                    // 508
//                                    HttpStatus.BANDWIDTH_LIMIT_EXCEEDED.value().toString(),         // 509
//                                    HttpStatus.NOT_EXTENDED.value().toString(),                     // 510
//                                    HttpStatus.NETWORK_AUTHENTICATION_REQUIRED.value().toString()   // 511
//                                )
//                            )
//                        }

                        // 3. Retry filter before timeout to allow retries
                        f.retry { retryConfig ->
                            retryConfig.retries = 3
                            retryConfig.setMethods(HttpMethod.GET)
                            retryConfig.setBackoff(
                                Duration.ofMillis(100),
                                Duration.ofMillis(1000),
                                2,
                                true
                            )
                            // add status codes that should trigger retry
                            retryConfig.setStatuses(
                                HttpStatus.INTERNAL_SERVER_ERROR,           // 500
                                HttpStatus.NOT_IMPLEMENTED,                 // 501
                                HttpStatus.BAD_GATEWAY,                     // 502
                                HttpStatus.SERVICE_UNAVAILABLE,             // 503
                                // HttpStatus.GATEWAY_TIMEOUT,              // 504
                                HttpStatus.HTTP_VERSION_NOT_SUPPORTED,      // 505
                                HttpStatus.VARIANT_ALSO_NEGOTIATES,         // 506
                                HttpStatus.INSUFFICIENT_STORAGE,            // 507
                                HttpStatus.LOOP_DETECTED,                   // 508
                                HttpStatus.BANDWIDTH_LIMIT_EXCEEDED,        // 509
                                HttpStatus.NOT_EXTENDED,                    // 510
                                HttpStatus.NETWORK_AUTHENTICATION_REQUIRED  // 511
                            )
                            // add exception types that should trigger retry
                            retryConfig.setExceptions(
                                IOException::class.java,
//                                TimeoutException::class.java,
                                ConnectException::class.java
                            )

                            retryConfig.validate()
                        }

                        f.filter { exchange, chain ->
                            val retryCount = exchange.getAttribute<Int>("retry_count") ?: 0
                            exchange.attributes["retry_count"] = retryCount + 1
                            logger.warn("Request attempt ${retryCount + 1} for ${exchange.request.uri}")
                            chain.filter(exchange)
                        }

                        // 4. Time limiter after retry
                        f.filter { exchange, chain ->

                            // exclude specific static resources from time-out limiter
                            val requestPath = exchange.request.uri.path
                            if (ignoreFilter.shouldSkipRequestPath(requestPath)) {
                                // allow the request to proceed without the time limiter
                                return@filter chain.filter(exchange)
                            }

                            val timeLimiter = timeLimiterRegistry.timeLimiter("resourceServerTimeLimiter")
                            val startTime = System.currentTimeMillis()

                            val futureSupplier = Supplier {
                                chain.filter(exchange)
                                .doOnSuccess {
                                    val duration = System.currentTimeMillis() - startTime
                                    logger.debug("Request completed within time limit for path: $requestPath, duration: ${duration}ms")
                                }
                                .doOnCancel {
                                    // Mark exchange as completed on cancellation
                                    exchange.attributes["cancelled"] = true
                                }
                                .toFuture()
                            }

                            Mono.fromCallable(timeLimiter.decorateFutureSupplier(futureSupplier))
                                .onErrorResume { throwable ->
                                    when (throwable) {
                                        is TimeoutException -> {
                                            val duration = System.currentTimeMillis() - startTime
                                            logger.warn("Request timed out for path: $requestPath, duration: ${duration}ms")

                                            // Mark exchange as completed
                                            exchange.attributes["completed"] = true

                                            // First set the status code
                                            Mono.defer {
                                                LocalExceptionHandlers.timeout(exchange)
                                            }
                                        }
                                        is CancellationException -> {
                                            logger.warn("Request cancelled for path: $requestPath")

                                            // Mark exchange as completed
                                            exchange.attributes["completed"] = true

                                            // First set the status code
                                            Mono.defer {
                                                LocalExceptionHandlers.timeout(exchange)
                                            }
                                        }
                                        else -> {
                                            logger.error("Unexpected error for path: $requestPath", throwable)
                                            if (exchange.attributes["completed"] == true ||
                                                exchange.attributes["cancelled"] == true) {
                                                Mono.empty()
                                            } else {
                                                logger.error("Unexpected error for path: $requestPath", throwable)
                                                Mono.error(throwable)
                                            }
                                        }
                                    }
                                }
                                .doOnError { throwable ->
                                    if (throwable !is TimeoutException &&
                                        throwable !is CancellationException &&
                                        exchange.attributes["completed"] != true &&
                                        exchange.attributes["cancelled"] != true) {
                                        logger.error("Unhandled error for path: $requestPath", throwable)
                                    }
                                }
                        }

                        // 5. Basic request cleanup
                        f.removeRequestHeader("Cookie")
                    }
                    .uri(serverProperties.resourceServerUri)
            }
            .build()
    }

}

Load Test 2

Then when I run the Apache Benchmarking test (here concurrency is set to 10, but I get the same when it is set to 1) ab -v 4 -n 25 -c 10 -H 'Cookie: BFF-SESSIONID=BFF-3f3400dd34c22190cebfc18ff8eda96e7d248ea3ef10e52e8a95c4df41a37e14-162166622840154-47453-c244a0f4-9fcc-44ad-b6d8-fdd7aa965d5c' http://127.0.0.1:9090/bff/api/v1/resource/contracts/hello

I get this:


Concurrency Level:      10
Time taken for tests:   21.616 seconds
Complete requests:      25
Failed requests:        23
   (Connect: 0, Receive: 0, Length: 23, Exceptions: 0)
Total transferred:      21134 bytes
HTML transferred:       8167 bytes
Requests per second:    1.16 [#/sec] (mean)
Time per request:       8646.540 [ms] (mean)
Time per request:       864.654 [ms] (mean, across all concurrent requests)
Transfer rate:          0.95 [Kbytes/sec] received

I can't figure out where in the TimeLimiter I am going wrong. Is it to do wit Kotlin, and not using a flow or Co-routine?

I spent 3 hours trying to debug this yesterday, but made no progress.

Commenting out the TimeLimiter code, I get back to the original, much faster benchmark results

Can someone please help?

LocalErrorHandlers

For reference here are my LocalExceptionHandlers (just a container for returning custom errors)

/**********************************************************************************************************************/
/*********************************************** LOCAL EXCEPTION HANDLERS *********************************************/
/**********************************************************************************************************************/

internal object LocalExceptionHandlers {

/**********************************************************************************************************************/
/* RATE LIMITER - MISSING KEY ERROR. */
/**********************************************************************************************************************/

    /**
     * Missing Key error response to handle missing key scenarios
     */
    internal fun missingKey(
        exchange: ServerWebExchange,
    ): Mono<Void> {

        val response = exchange.response
        response.statusCode = HttpStatus.BAD_REQUEST
        response.headers.contentType = MediaType.APPLICATION_JSON
        response.headers.add("Rate-Limiting-Exception", "Key not resolved")

        // create ThrottlingException
        val throttlingException = ThrottlingException(
            type = URI.create(URIErrorTypes.BAD_REQUEST.type),
            httpStatus = HttpStatus.BAD_REQUEST,
            title = "Rate Limiting Exception",
            message = "Missing key",
            cause = null,
            ErrorCodeTypes.THROTTLING_EXCEPTION.code,
            ErrorCategoryTypes.RATE_LIMITING_ERROR.type,
        )

        // convert to ProblemDetailsExtended
        val problemDetailsExtended = ProblemDetailsExtended(
            type = throttlingException.type,
            status = throttlingException.httpStatus.value(),
            title = throttlingException.title,
            detail = throttlingException.message,
            instance = null,
            code = throttlingException.code,
            errorCategory = throttlingException.errorCategory,
            errors = listOf(
                ErrorDetail(
                    detail = throttlingException.cause?.message ?: throttlingException.message,
                    code = throttlingException.code.toString()
                )
            )
        )

        // serialize to JSON
        val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
        val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())

        return response.writeWith(Mono.just(responseBody))
    }

/**********************************************************************************************************************/
/* RATE LIMITER - TOO MANY REQUESTS. */
/**********************************************************************************************************************/

    /**
     * Missing Key error response to handle missing key scenarios
     */
    internal fun rateLimitExceeded(
        exchange: ServerWebExchange,
    ): Mono<Void> {

        val response = exchange.response
        response.statusCode = HttpStatus.TOO_MANY_REQUESTS
        response.headers.contentType = MediaType.APPLICATION_JSON
        response.headers.add("Rate-Limiting-Exception", "Too many requests")

        // create ThrottlingException
        val throttlingException = ThrottlingException(
            type = URI.create(URIErrorTypes.TOO_MANY_REQUESTS.type),
            httpStatus = HttpStatus.TOO_MANY_REQUESTS,
            title = "Rate Limiting Exception",
            message = "Too many requests",
            cause = null,
            ErrorCodeTypes.THROTTLING_EXCEPTION.code,
            ErrorCategoryTypes.RATE_LIMITING_ERROR.type,
        )

        // convert to ProblemDetailsExtended
        val problemDetailsExtended = ProblemDetailsExtended(
            type = throttlingException.type,
            status = throttlingException.httpStatus.value(),
            title = throttlingException.title,
            detail = throttlingException.message,
            instance = null,
            code = throttlingException.code,
            errorCategory = throttlingException.errorCategory,
            errors = listOf(
                ErrorDetail(
                    detail = throttlingException.cause?.message ?: throttlingException.message,
                    code = throttlingException.code.toString()
                )
            )
        )

        // serialize to JSON
        val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
        val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())

        return response.writeWith(Mono.just(responseBody))
    }

/**********************************************************************************************************************/
/* TIMEOUT LIMITER - TIME LIMIT EXCEEDED. */
/**********************************************************************************************************************/

    /**
     * Timeout Limiter Error
     */
    internal fun timeout(
        exchange: ServerWebExchange
    ): Mono<Void> {

        val response = exchange.response
//        response.statusCode = HttpStatus.GATEWAY_TIMEOUT
        response.headers.contentType = MediaType.APPLICATION_JSON
        response.headers.add("Timeout-Exception", "Time limit exceeded")

        // create ThrottlingException
        val throttlingException = ThrottlingException(
            type = URI.create(URIErrorTypes.SERVICE_UNAVAILABLE.type),
            httpStatus = HttpStatus.GATEWAY_TIMEOUT,
            title = "Timeout Exception",
            message = "Time limit exceeded",
            cause = null,
            ErrorCodeTypes.GENERAL_EXCEPTION.code,
            ErrorCategoryTypes.TIME_LIMITING_ERROR.type,
        )

        // convert to ProblemDetailsExtended
        val problemDetailsExtended = ProblemDetailsExtended(
            type = throttlingException.type,
            status = throttlingException.httpStatus.value(),
            title = throttlingException.title,
            detail = throttlingException.message,
            instance = null,
            code = throttlingException.code,
            errorCategory = throttlingException.errorCategory,
            errors = listOf(
                ErrorDetail(
                    detail = throttlingException.cause?.message ?: throttlingException.message,
                    code = throttlingException.code.toString()
                )
            )
        )

        // serialize to JSON
        val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
        val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())

        return response.writeWith(Mono.just(responseBody))

    }
}

/**********************************************************************************************************************/
/**************************************************** END OF KOTLIN ***************************************************/
/**********************************************************************************************************************/

Upvotes: 0

Views: 15

Answers (0)

Related Questions