expert
expert

Reputation: 30095

Why Flux.flatMap() doesn't wait for completion of inner publisher?

Could you please explain what exactly happens in Flux/Mono returned by HttpClient.response() ? I thought value generated by http client will NOT be passed downstream until Mono completes but I see that tons of requests are generated which ends up with reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8 exception. It works as expected (items being processed one by one) if I replace call to testRequest() with Mono.fromCallable { }.

What am I missing ?

Test code:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
    private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

    fun main() {
        Flux.fromIterable(0..99)
                .flatMap { obj ->
                    println("Creating request for: $obj")
                    testRequest()
                            .doOnError { ex ->
                                println("Failed request for: $obj")
                                ex.printStackTrace()
                            }
                            .map { res ->
                                obj to res
                            }
                }
                .doOnNext { (obj, res) ->
                    println("Created request for: $obj ${res.length} characters")
                }
                .collectList().block()!!
    }

    fun testRequest(): Mono<String> {
        return client.get()
                .uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
                .responseContent()
                .reduce(StringBuilder(), { sb, buf ->
                    val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
                    sb.append(str)
                })
                .map { it.toString() }
    }
}

Upvotes: 3

Views: 2731

Answers (1)

Violeta Georgieva
Violeta Georgieva

Reputation: 2282

When you create the ConnectionProvider like this ConnectionProvider.create("meh", 4), this means connection pool with max connections 4 and max pending requests 8. See here more about this.

When you use flatMap this means Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave See here more about this.

So what happens is that you are trying to run all requests simultaneously.

So you have two options:

  • If you want to use flatMap then increase the number of the pending requests.
  • If you want to keep the number of the pending requests you may consider for example using concatMap instead of flatMap, which means Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. See more here about this.

Upvotes: 5

Related Questions