Reputation: 30095
Could you please explain what exactly happens in Flux/Mono returned by HttpClient.response()
? I thought value generated by http client will NOT be passed downstream until Mono completes but I see that tons of requests are generated which ends up with reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8
exception. It works as expected (items being processed one by one) if I replace call to testRequest()
with Mono.fromCallable { }
.
What am I missing ?
Test code:
import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))
fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}
fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}
Upvotes: 3
Views: 2731
Reputation: 2282
When you create the ConnectionProvider
like this ConnectionProvider.create("meh", 4)
, this means connection pool with max connections 4 and max pending requests 8. See here more about this.
When you use flatMap
this means Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave
See here more about this.
So what happens is that you are trying to run all requests simultaneously.
So you have two options:
flatMap
then increase the number of the pending requests.concatMap
instead of flatMap
, which means Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation
. See more here about this.Upvotes: 5