Felipe Windmoller
Felipe Windmoller

Reputation: 1753

Can I configure the pool size of a specific Quarkus Vertx ConsumeEvent?

The parameter quarkus.vertx.worker-pool-size allows me to configure the "Thread size of the worker thread pool", according to the quarkus guide - All configuration options.

Is it possible to configure the pool size of a specific Quarkus ConsumeEvent like this:

    @io.quarkus.vertx.ConsumeEvent(value = "my-consume-event", blocking = true)
    public void start(String value) {
      // do the work
    }

I would like to set the number of threads that can process this my-consume-event without changing the global quarkus.vertx.worker-pool-size.

SmallRye Reactive Messaging example of configurable thread pool

On the SmallRye Reactive Messaging guide there's one example of what I want to do.

Here, I can use one Blocking annotation, define one name for it and configure the thread pool:

@Outgoing("Y")
@Incoming("X")
@Blocking("my-custom-pool")
public String process(String s) {
  return s.toUpperCase();
}

Specifying the concurrency for the above worker pool requires the following configuration property to be defined:

smallrye.messaging.worker.my-custom-pool.max-concurrency=3

In this example, I can configure the size of the thread pool that will process the messages from the my-custom-pool.

Thanks

Edit to include try with @io.smallrye.common.annotation.Blocking("my-custom-pool")

I tried to set a value to the @io.smallrye.common.annotation.Blocking("my-custom-pool") annotation, but I receive the following error:

The attribute value is undefined for the annotation type Blocking

I'm using this dependency:

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-vertx</artifactId>
    </dependency>

I also created this project on my GitHub account to do this test.

Class that does the test

package org.acme;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

import org.jboss.logging.Logger;

import io.vertx.core.eventbus.EventBus;

@Path("/hello")
public class GreetingResource {
    private static final Logger LOG = Logger.getLogger(GreetingResource.class);

    @Inject
    EventBus eventBus;

    @GET
    public Response hello() {
        LOG.info("hello()");
        eventBus.send("my-consume-event", null);
        return Response
                .status(Response.Status.ACCEPTED)
                .build();
    }

    @io.quarkus.vertx.ConsumeEvent("my-consume-event")
    // @io.smallrye.common.annotation.Blocking("my-custom-pool")
    @io.smallrye.common.annotation.Blocking
    public void start(String value) {
        try {
            LOG.info("before the sleep");
            Thread.sleep(5000);
            LOG.info("after the sleep");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Upvotes: 2

Views: 2779

Answers (1)

tsegismont
tsegismont

Reputation: 9128

You should be able to use the @io.smallrye.common.annotation.Blocking annotation:

@ConsumeEvent("my-consume-event")
@Blocking("my-custom-pool")
public void start(String value) {
  // do the work
}

And configure your pool size in application.properties:

smallrye.messaging.worker.my-custom-pool.max-concurrency=3

EDIT

Actually, the @io.smallrye.reactive.messaging.annotations.Blocking is not supported on methods annotated with @ConsumeEvent.

Also, according to ConsumeEvent with Blocking Threading on wrong ExecutorService #19911, it seems events are executed on the default Quarkus executor.

Upvotes: 1

Related Questions