Reputation: 28
I have a client to request remote server
Multi.createFrom()
.items(
userInfoList.stream())
.onItem()
.transformToUniAndMerge(
userInfo -> {
System.out.println( personInfo.toString() );
restClientService.aRESTClientService( userInfo );
}
)
rest client :
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/xxx")
@RegisterRestClient
public interface RestClientService {
@GET
@Path("/xxxx")
Uni<ResultDto<String>> aRESTClientService(UserInfo userInfo);
}
am I doing something wrong ? or is there something that can be configured
Upvotes: 0
Views: 1777
Reputation: 66
I don't know if this will help, but I had a similar issue. I had similar code:
@Scheduled(every = "1s")
void processRequests() {
// assume pendingRequests is an unlimited incoming stream
Multi.createBy().repeating().supplier(pendingRequests::poll)
... // rest client calling each request
}
The throughput of this application was exactly 256, and I thought it was due to some limitation in the client; but no, it was because the stream was overflowing after 256 polls. You might be able to increase the throughput by having multiple Multi
streams consuming from userInfoList.stream()
. Also, use a counter to see how many items you were able to consume before overflowing; you might find out that overflowing is the issue.
A came across an article that explained the magical 256 number (https://pandepra.medium.com/project-reactors-flatmap-and-backpressure-fba97472d625). I Also did some testing to understand how flatMap
works. So flatMap
's request number is 256, so you are stuck with processing at most 256 items. Also, after n items have been processed (sent to downstream subscribers), n items will be requested again, always up to 256 (flatMap
holds an internal queue with max of 256 items). In my first description I was overflowing since poll()
was pushing more than 256 items. If you want to increase this number you can do:
// increase the concurrency value in `merge` (default is 256)
Multi...
.onItem().transformToUni(n -> Uni.createFrom()...).merge(500)
// OR have multiple streams consuming
Multi.createBy().merging().streams(List.of(
// each stream here has a flatMap
multi1(),
multi2()
))
Not sure if there is any major difference with my suggestions above though. Finally, log()
is you friend. I have been playing around with log()
to understand what each operator is doing in between.
Upvotes: 2