Michał Mielec
Michał Mielec

Reputation: 1651

How to limit number of open sockets in spring-webflux WebClient?

I have some RESTful service and I had idea to prepare simple performance benchmark with Reactor and Spring WebClient. Benchmark simply creates N users and then for every created user post M votes.

Unfortunately following code exceeds max open file limit in my linux machine which is 1024 (ulimit -n 1024).

    RestService restService = ...
    int N_ITERATIONS = 100;
    int M_VOTES = 100;

    Flux.range(0, N_ITERATIONS)
            .parallel()
            .runOn(Schedulers.parallel())
            .flatMap(iteration -> restService.postUserRegistration(User.builder().build()))
            .flatMap(user -> Flux.range(0, M_VOTES)
                    .flatMap(vote -> restService.postUserVote(Vote.builder().build()))
                    .collectList()
                    .map(votes -> Tuple.of(user, votes))
            ).doOnNext(userVotes -> log.info("User: {} voted: {}", userVotes._1(), userVotes._2()))
            .sequential()
            .toIterable();

RestService is implemented with standard WebClient from Spring Webflux.

Is there a way to limit number of created sockets base on system limit?

Stacktrace:

Caused by: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files
    at io.netty.channel.unix.Errors.newIOException(Errors.java:122) ~[netty-transport-native-unix-common-4.1.27.Final.jar:4.1.27.Final]
    ... 98 common frames omitted

Upvotes: 2

Views: 2553

Answers (1)

ESala
ESala

Reputation: 7058

I don't think there is. But you could take steps to prevent it.

First, why is your file descriptor limit so low? Linux opens a file descriptor for each open socket, so 1024 is very low if you intend to have a lot of open sockets simultaneously. I would consider increasing this limit a lot.

Second, you are leaving the concurrency configuration up to the scheduler. You should know that there's a variant of flatMap operator that allows you to control how many Publisher can be subscribed to and merged in parallel:

Flux<V> flatMap(
            Function<? super T,? extends Publisher<? extends V>> mapper,
            int concurrency)

Using the concurrency parameter you would be able to define how many in-flight sequences you want to allow.

Upvotes: 3

Related Questions