Reputation: 1753
The parameter quarkus.vertx.worker-pool-size
allows me to configure the "Thread size of the worker thread pool", according to the quarkus guide - All configuration options.
Is it possible to configure the pool size of a specific Quarkus ConsumeEvent like this:
@io.quarkus.vertx.ConsumeEvent(value = "my-consume-event", blocking = true)
public void start(String value) {
// do the work
}
I would like to set the number of threads that can process this my-consume-event
without changing the global quarkus.vertx.worker-pool-size
.
On the SmallRye Reactive Messaging guide there's one example of what I want to do.
Here, I can use one Blocking annotation, define one name for it and configure the thread pool:
@Outgoing("Y")
@Incoming("X")
@Blocking("my-custom-pool")
public String process(String s) {
return s.toUpperCase();
}
Specifying the concurrency for the above worker pool requires the following configuration property to be defined:
smallrye.messaging.worker.my-custom-pool.max-concurrency=3
In this example, I can configure the size of the thread pool that will process the messages from the my-custom-pool
.
Thanks
@io.smallrye.common.annotation.Blocking("my-custom-pool")
I tried to set a value to the @io.smallrye.common.annotation.Blocking("my-custom-pool")
annotation, but I receive the following error:
The attribute value is undefined for the annotation type Blocking
I'm using this dependency:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
I also created this project on my GitHub account to do this test.
package org.acme;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import org.jboss.logging.Logger;
import io.vertx.core.eventbus.EventBus;
@Path("/hello")
public class GreetingResource {
private static final Logger LOG = Logger.getLogger(GreetingResource.class);
@Inject
EventBus eventBus;
@GET
public Response hello() {
LOG.info("hello()");
eventBus.send("my-consume-event", null);
return Response
.status(Response.Status.ACCEPTED)
.build();
}
@io.quarkus.vertx.ConsumeEvent("my-consume-event")
// @io.smallrye.common.annotation.Blocking("my-custom-pool")
@io.smallrye.common.annotation.Blocking
public void start(String value) {
try {
LOG.info("before the sleep");
Thread.sleep(5000);
LOG.info("after the sleep");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Upvotes: 2
Views: 2779
Reputation: 9128
You should be able to use the @io.smallrye.common.annotation.Blocking
annotation:
@ConsumeEvent("my-consume-event")
@Blocking("my-custom-pool")
public void start(String value) {
// do the work
}
And configure your pool size in application.properties
:
smallrye.messaging.worker.my-custom-pool.max-concurrency=3
EDIT
Actually, the @io.smallrye.reactive.messaging.annotations.Blocking
is not supported on methods annotated with @ConsumeEvent
.
Also, according to ConsumeEvent with Blocking Threading on wrong ExecutorService #19911, it seems events are executed on the default Quarkus executor.
Upvotes: 1