Reputation: 527
My Rate Limiter is a Global One shown below:
Rate Limiter
@Configuration
internal class RequestRateLimiterConfig(
private val redisRateLimiter: RedisRateLimiter,
private val defaultKeyResolver: KeyResolver
) {
companion object {
const val RATE_LIMITER_ID = "redis-rate-limiter" // default key used by Spring Cloud Gateway
}
private val logger = LoggerFactory.getLogger(RequestRateLimiterConfig::class.java)
@Bean
fun requestRateLimiter(): GlobalFilter {
return GlobalFilter { exchange, chain ->
val keyMono = defaultKeyResolver.resolve(exchange)
keyMono
.flatMap { key ->
if (key.isNullOrEmpty()) {
// if key is null or empty, return error
logger.warn("Empty session ID detected. Sending error response.")
return@flatMap LocalExceptionHandlers.missingKey(exchange)
} else {
// if key is present, continue with rate limiting
logger.info("Resolved key: $key")
redisRateLimiter.isAllowed(RATE_LIMITER_ID, key)
.flatMap { response ->
if (!response.isAllowed) {
logger.warn("Rate limit exceeded for key: $key")
LocalExceptionHandlers.rateLimitExceeded(exchange)
} else {
chain.filter(exchange)
}
}
}
}.then()
}
}
}
/**
* More Rate Limiter configuration
*/
@Configuration
internal class RedisRateLimiterConfig(
private val sessionProperties: SessionProperties
) {
private val logger = LoggerFactory.getLogger(RedisRateLimiterConfig::class.java)
/**
* Redis Rate Limited
*/
@Bean
fun redisRateLimiter(): RedisRateLimiter {
return RedisRateLimiter(10, 20, 1)
}
/**
* Default Key Resolver
*/
@Bean
fun defaultKeyResolver(): KeyResolver {
return KeyResolver { exchange: ServerWebExchange ->
val sessionId = exchange.request.cookies[sessionProperties.SESSION_COOKIE_NAME]?.first()?.value
if (sessionId.isNullOrBlank()) {
logger.warn("No session ID found in cookie.")
Mono.just("")
} else {
logger.info("Resolved session ID for Rate Limiting: $sessionId")
Mono.justOrEmpty(sessionId)
}
}
}
}**
Load Test 1
When I do an Apache Benchmark Load test, I get the following back (as expected)
ab -v 4 -n 25 -c 1 -H 'Cookie: BFF-SESSIONID=BFF-3f3400dd34c22190cebfc18ff8eda96e7d248ea3ef10e52e8a95c4df41a37e14-162166622840154-47453-c244a0f4-9fcc-44ad-b6d8-fdd7aa965d5c' http://127.0.0.1:9090/bff/api/v1/resource/contracts/hello
Concurrency Level: 1
Time taken for tests: 2.429 seconds
Complete requests: 25
Failed requests: 0
Total transferred: 20100 bytes
HTML transferred: 325 bytes
Requests per second: 10.29 [#/sec] (mean)
Time per request: 97.149 [ms] (mean)
Time per request: 97.149 [ms] (mean, across all concurrent requests)
Transfer rate: 8.08 [Kbytes/sec] received
Time Limiter (No. 4 in the below)
However when I add a TimeLimiter to my router, I get very different results (I've isolated this being due to the TimeLimiter)
@Configuration
internal class RoutingConfig(
private val serverProperties: ServerProperties,
private val ignoreFilter: IgnoreFilterConfig,
) {
private val logger = LoggerFactory.getLogger(RoutingConfig::class.java)
@Bean
fun routeLocator(
builder: RouteLocatorBuilder,
tokenRelayGatewayFilterFactory: TokenRelayGatewayFilterFactory,
timeLimiterRegistry: TimeLimiterRegistry,
): RouteLocator {
return builder.routes()
// routing for Resource Server
.route("resource-server") { r ->
r.path("${serverProperties.resourceServerPrefix}/**")
.filters { f ->
// 1. Token relay filter first for authentication
f.filter(tokenRelayGatewayFilterFactory.apply())
// 2. Circuit Breaker before retry
// f.circuitBreaker { circuitBreakerConfig ->
// circuitBreakerConfig.setName("resourceServerCircuitBreaker")
// circuitBreakerConfig.setFallbackUri("forward:/fallback")
// circuitBreakerConfig.setStatusCodes(
// setOf(
// HttpStatus.INTERNAL_SERVER_ERROR.value().toString(), // 500
// HttpStatus.NOT_IMPLEMENTED.value().toString(), // 501
// HttpStatus.BAD_GATEWAY.value().toString(), // 502
// HttpStatus.SERVICE_UNAVAILABLE.value().toString(), // 503
// HttpStatus.GATEWAY_TIMEOUT.value().toString(), // 504
// HttpStatus.HTTP_VERSION_NOT_SUPPORTED.value().toString(), // 505
// HttpStatus.VARIANT_ALSO_NEGOTIATES.value().toString(), // 506
// HttpStatus.INSUFFICIENT_STORAGE.value().toString(), // 507
// HttpStatus.LOOP_DETECTED.value().toString(), // 508
// HttpStatus.BANDWIDTH_LIMIT_EXCEEDED.value().toString(), // 509
// HttpStatus.NOT_EXTENDED.value().toString(), // 510
// HttpStatus.NETWORK_AUTHENTICATION_REQUIRED.value().toString() // 511
// )
// )
// }
// 3. Retry filter before timeout to allow retries
f.retry { retryConfig ->
retryConfig.retries = 3
retryConfig.setMethods(HttpMethod.GET)
retryConfig.setBackoff(
Duration.ofMillis(100),
Duration.ofMillis(1000),
2,
true
)
// add status codes that should trigger retry
retryConfig.setStatuses(
HttpStatus.INTERNAL_SERVER_ERROR, // 500
HttpStatus.NOT_IMPLEMENTED, // 501
HttpStatus.BAD_GATEWAY, // 502
HttpStatus.SERVICE_UNAVAILABLE, // 503
// HttpStatus.GATEWAY_TIMEOUT, // 504
HttpStatus.HTTP_VERSION_NOT_SUPPORTED, // 505
HttpStatus.VARIANT_ALSO_NEGOTIATES, // 506
HttpStatus.INSUFFICIENT_STORAGE, // 507
HttpStatus.LOOP_DETECTED, // 508
HttpStatus.BANDWIDTH_LIMIT_EXCEEDED, // 509
HttpStatus.NOT_EXTENDED, // 510
HttpStatus.NETWORK_AUTHENTICATION_REQUIRED // 511
)
// add exception types that should trigger retry
retryConfig.setExceptions(
IOException::class.java,
// TimeoutException::class.java,
ConnectException::class.java
)
retryConfig.validate()
}
f.filter { exchange, chain ->
val retryCount = exchange.getAttribute<Int>("retry_count") ?: 0
exchange.attributes["retry_count"] = retryCount + 1
logger.warn("Request attempt ${retryCount + 1} for ${exchange.request.uri}")
chain.filter(exchange)
}
// 4. Time limiter after retry
f.filter { exchange, chain ->
// exclude specific static resources from time-out limiter
val requestPath = exchange.request.uri.path
if (ignoreFilter.shouldSkipRequestPath(requestPath)) {
// allow the request to proceed without the time limiter
return@filter chain.filter(exchange)
}
val timeLimiter = timeLimiterRegistry.timeLimiter("resourceServerTimeLimiter")
val startTime = System.currentTimeMillis()
val futureSupplier = Supplier {
chain.filter(exchange)
.doOnSuccess {
val duration = System.currentTimeMillis() - startTime
logger.debug("Request completed within time limit for path: $requestPath, duration: ${duration}ms")
}
.doOnCancel {
// Mark exchange as completed on cancellation
exchange.attributes["cancelled"] = true
}
.toFuture()
}
Mono.fromCallable(timeLimiter.decorateFutureSupplier(futureSupplier))
.onErrorResume { throwable ->
when (throwable) {
is TimeoutException -> {
val duration = System.currentTimeMillis() - startTime
logger.warn("Request timed out for path: $requestPath, duration: ${duration}ms")
// Mark exchange as completed
exchange.attributes["completed"] = true
// First set the status code
Mono.defer {
LocalExceptionHandlers.timeout(exchange)
}
}
is CancellationException -> {
logger.warn("Request cancelled for path: $requestPath")
// Mark exchange as completed
exchange.attributes["completed"] = true
// First set the status code
Mono.defer {
LocalExceptionHandlers.timeout(exchange)
}
}
else -> {
logger.error("Unexpected error for path: $requestPath", throwable)
if (exchange.attributes["completed"] == true ||
exchange.attributes["cancelled"] == true) {
Mono.empty()
} else {
logger.error("Unexpected error for path: $requestPath", throwable)
Mono.error(throwable)
}
}
}
}
.doOnError { throwable ->
if (throwable !is TimeoutException &&
throwable !is CancellationException &&
exchange.attributes["completed"] != true &&
exchange.attributes["cancelled"] != true) {
logger.error("Unhandled error for path: $requestPath", throwable)
}
}
}
// 5. Basic request cleanup
f.removeRequestHeader("Cookie")
}
.uri(serverProperties.resourceServerUri)
}
.build()
}
}
Load Test 2
Then when I run the Apache Benchmarking test (here concurrency is set to 10, but I get the same when it is set to 1)
ab -v 4 -n 25 -c 10 -H 'Cookie: BFF-SESSIONID=BFF-3f3400dd34c22190cebfc18ff8eda96e7d248ea3ef10e52e8a95c4df41a37e14-162166622840154-47453-c244a0f4-9fcc-44ad-b6d8-fdd7aa965d5c' http://127.0.0.1:9090/bff/api/v1/resource/contracts/hello
I get this:
Concurrency Level: 10
Time taken for tests: 21.616 seconds
Complete requests: 25
Failed requests: 23
(Connect: 0, Receive: 0, Length: 23, Exceptions: 0)
Total transferred: 21134 bytes
HTML transferred: 8167 bytes
Requests per second: 1.16 [#/sec] (mean)
Time per request: 8646.540 [ms] (mean)
Time per request: 864.654 [ms] (mean, across all concurrent requests)
Transfer rate: 0.95 [Kbytes/sec] received
I can't figure out where in the TimeLimiter I am going wrong. Is it to do wit Kotlin, and not using a flow or Co-routine?
I spent 3 hours trying to debug this yesterday, but made no progress.
Commenting out the TimeLimiter code, I get back to the original, much faster benchmark results
Can someone please help?
LocalErrorHandlers
For reference here are my LocalExceptionHandlers (just a container for returning custom errors)
/**********************************************************************************************************************/
/*********************************************** LOCAL EXCEPTION HANDLERS *********************************************/
/**********************************************************************************************************************/
internal object LocalExceptionHandlers {
/**********************************************************************************************************************/
/* RATE LIMITER - MISSING KEY ERROR. */
/**********************************************************************************************************************/
/**
* Missing Key error response to handle missing key scenarios
*/
internal fun missingKey(
exchange: ServerWebExchange,
): Mono<Void> {
val response = exchange.response
response.statusCode = HttpStatus.BAD_REQUEST
response.headers.contentType = MediaType.APPLICATION_JSON
response.headers.add("Rate-Limiting-Exception", "Key not resolved")
// create ThrottlingException
val throttlingException = ThrottlingException(
type = URI.create(URIErrorTypes.BAD_REQUEST.type),
httpStatus = HttpStatus.BAD_REQUEST,
title = "Rate Limiting Exception",
message = "Missing key",
cause = null,
ErrorCodeTypes.THROTTLING_EXCEPTION.code,
ErrorCategoryTypes.RATE_LIMITING_ERROR.type,
)
// convert to ProblemDetailsExtended
val problemDetailsExtended = ProblemDetailsExtended(
type = throttlingException.type,
status = throttlingException.httpStatus.value(),
title = throttlingException.title,
detail = throttlingException.message,
instance = null,
code = throttlingException.code,
errorCategory = throttlingException.errorCategory,
errors = listOf(
ErrorDetail(
detail = throttlingException.cause?.message ?: throttlingException.message,
code = throttlingException.code.toString()
)
)
)
// serialize to JSON
val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())
return response.writeWith(Mono.just(responseBody))
}
/**********************************************************************************************************************/
/* RATE LIMITER - TOO MANY REQUESTS. */
/**********************************************************************************************************************/
/**
* Missing Key error response to handle missing key scenarios
*/
internal fun rateLimitExceeded(
exchange: ServerWebExchange,
): Mono<Void> {
val response = exchange.response
response.statusCode = HttpStatus.TOO_MANY_REQUESTS
response.headers.contentType = MediaType.APPLICATION_JSON
response.headers.add("Rate-Limiting-Exception", "Too many requests")
// create ThrottlingException
val throttlingException = ThrottlingException(
type = URI.create(URIErrorTypes.TOO_MANY_REQUESTS.type),
httpStatus = HttpStatus.TOO_MANY_REQUESTS,
title = "Rate Limiting Exception",
message = "Too many requests",
cause = null,
ErrorCodeTypes.THROTTLING_EXCEPTION.code,
ErrorCategoryTypes.RATE_LIMITING_ERROR.type,
)
// convert to ProblemDetailsExtended
val problemDetailsExtended = ProblemDetailsExtended(
type = throttlingException.type,
status = throttlingException.httpStatus.value(),
title = throttlingException.title,
detail = throttlingException.message,
instance = null,
code = throttlingException.code,
errorCategory = throttlingException.errorCategory,
errors = listOf(
ErrorDetail(
detail = throttlingException.cause?.message ?: throttlingException.message,
code = throttlingException.code.toString()
)
)
)
// serialize to JSON
val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())
return response.writeWith(Mono.just(responseBody))
}
/**********************************************************************************************************************/
/* TIMEOUT LIMITER - TIME LIMIT EXCEEDED. */
/**********************************************************************************************************************/
/**
* Timeout Limiter Error
*/
internal fun timeout(
exchange: ServerWebExchange
): Mono<Void> {
val response = exchange.response
// response.statusCode = HttpStatus.GATEWAY_TIMEOUT
response.headers.contentType = MediaType.APPLICATION_JSON
response.headers.add("Timeout-Exception", "Time limit exceeded")
// create ThrottlingException
val throttlingException = ThrottlingException(
type = URI.create(URIErrorTypes.SERVICE_UNAVAILABLE.type),
httpStatus = HttpStatus.GATEWAY_TIMEOUT,
title = "Timeout Exception",
message = "Time limit exceeded",
cause = null,
ErrorCodeTypes.GENERAL_EXCEPTION.code,
ErrorCategoryTypes.TIME_LIMITING_ERROR.type,
)
// convert to ProblemDetailsExtended
val problemDetailsExtended = ProblemDetailsExtended(
type = throttlingException.type,
status = throttlingException.httpStatus.value(),
title = throttlingException.title,
detail = throttlingException.message,
instance = null,
code = throttlingException.code,
errorCategory = throttlingException.errorCategory,
errors = listOf(
ErrorDetail(
detail = throttlingException.cause?.message ?: throttlingException.message,
code = throttlingException.code.toString()
)
)
)
// serialize to JSON
val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())
return response.writeWith(Mono.just(responseBody))
}
}
/**********************************************************************************************************************/
/**************************************************** END OF KOTLIN ***************************************************/
/**********************************************************************************************************************/
Upvotes: 0
Views: 15