ULRAMAN TIGA
ULRAMAN TIGA

Reputation: 28

Is Quarkus rest client parallel connections limited to 256

I have a client to request remote server

Multi.createFrom()
      .items(
          userInfoList.stream())
      .onItem()
      .transformToUniAndMerge(
           userInfo -> {
              System.out.println( personInfo.toString() );
              restClientService.aRESTClientService( userInfo );
           }
      )

rest client :

@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/xxx")
@RegisterRestClient
public interface RestClientService {
    @GET
    @Path("/xxxx")
    Uni<ResultDto<String>> aRESTClientService(UserInfo userInfo);
}

am I doing something wrong ? or is there something that can be configured

Upvotes: 0

Views: 1777

Answers (1)

A.N.T.
A.N.T.

Reputation: 66

I don't know if this will help, but I had a similar issue. I had similar code:

@Scheduled(every = "1s")
void processRequests() {
    
    // assume pendingRequests is an unlimited incoming stream
    Multi.createBy().repeating().supplier(pendingRequests::poll)
    ... // rest client calling each request
}

The throughput of this application was exactly 256, and I thought it was due to some limitation in the client; but no, it was because the stream was overflowing after 256 polls. You might be able to increase the throughput by having multiple Multi streams consuming from userInfoList.stream(). Also, use a counter to see how many items you were able to consume before overflowing; you might find out that overflowing is the issue.

UPDATE

A came across an article that explained the magical 256 number (https://pandepra.medium.com/project-reactors-flatmap-and-backpressure-fba97472d625). I Also did some testing to understand how flatMap works. So flatMap's request number is 256, so you are stuck with processing at most 256 items. Also, after n items have been processed (sent to downstream subscribers), n items will be requested again, always up to 256 (flatMap holds an internal queue with max of 256 items). In my first description I was overflowing since poll() was pushing more than 256 items. If you want to increase this number you can do:

// increase the concurrency value in `merge` (default is 256)
Multi...
.onItem().transformToUni(n -> Uni.createFrom()...).merge(500)

// OR have multiple streams consuming 
Multi.createBy().merging().streams(List.of(
        // each stream here has a flatMap
        multi1(),
        multi2()
    ))

Not sure if there is any major difference with my suggestions above though. Finally, log() is you friend. I have been playing around with log() to understand what each operator is doing in between.

Upvotes: 2

Related Questions