Reputation: 1
I"m doing a stress test on vert.x application and send ~10K RPS. My application send an http async request from a dedicated verticle. I"m using vert.x http client, and see that around 20 seconds my application sent the http requests successfully. After 20 seconds i"m starting to get a lot of "Cannot assign requested address" errors.
I tried to deploy more verticles, to set different values to the http client thread pool and nothing helped to solve the issue. I guess that the issue related to the high throughput in a short time around 1 minute.
Main Class:
public static void main(String[] args) {
final VertxOptions vertxOptions = new VertxOptions()
.setMaxEventLoopExecuteTime(1)
.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
final Vertx vertx = Vertx.vertx(vertxOptions);
final Injector injector = Guice.createInjector(new Dependencies(vertx));
CustomCodecRegister.register(vertx.eventBus());
final Stream<Future<String>> deploymentFutures = Stream.of(
deployWorker(vertx, injector, StatsHttpVerticle.class, 10)
).flatMap(stream -> stream);
CompositeFuture.all(deploymentFutures.collect(Collectors.toList()))
.onSuccess(successfulCompositeFuture -> { });}
private static <T> Stream<Future<String>> deployWorker(Vertx vertx, Injector injector, Class<T> workerVerticleClass, int numVerticles) {
final String poolName = workerVerticleClass.getSimpleName()
.toLowerCase()
.replace("verticle", "-worker-pool");
final int numberOfThreads = 50;
final DeploymentOptions options = new DeploymentOptions()
.setWorker(true)
.setWorkerPoolName(poolName)
.setWorkerPoolSize(numberOfThreads);
return IntStream.range(0, numVerticles)
.mapToObj(ignore -> Future.future((Promise<String> promise) ->
vertx.deployVerticle((Verticle) injector.getInstance(workerVerticleClass), options, promise)));
}
EventBusAdapter:
public void send(Map<String, Object> queryParams, HashMap<String, String> headers, boolean followRedirect, Event eventToFire) {
StatsRequest statsRequest = new StatsRequest(queryParams, headers, eventToFire, followRedirect);
eventBus.request(FIRE_GET_METRIC_TO_STATS,statsRequest);
}
WorkerVerticle:
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.eventBus().consumer(FIRE_GET_METRIC_TO_STATS, this::fire);
startPromise.complete();
}
private void fire(Message<StatsRequest> message) {
StatsRequest body = message.body();
MultiMap multimapHeader = MultiMap.caseInsensitiveMultiMap();
WebClientOptions webClientOptions = new WebClientOptions();
webClientOptions.setMaxPoolSize(1000);
WebClient httpClient = WebClient.create(vertx, webClientOptions);
httpClient.request(HttpMethod.GET, port, "example.com", "/1x1.gif" + "?" + "queryParamsString")
.followRedirects(false)
.putHeaders(multimapHeader)
.timeout(120000)
.send()
.onSuccess(response -> {
logger.info("All good");
})
.onFailure(err -> {
logger.error("Exception: " + err.getMessage());
});
}
How can i solve this issue?
Upvotes: 0
Views: 137