vvra
vvra

Reputation: 2912

Quarkus Kafka concurrency

I use quarkus to process incoming messages from Kafka topic that has 4 partitions. While I ingest multiple messages to this topic I see the quarkus-kafka-consumer-code process the message one by one sequentially.

Below is my logic that process the incoming message. Having the topic with 4 partitions should the processEvent method handle 4 messages in parallel as I run only one instance of this consumer-app. Am I missing anything?

@Incoming("topic-1")
@Retry(delay = 3, maxRetries = 3, retryOn = { MyException.class })
@NonBlocking
public CompletionStage<Void> processEvent(KafkaRecordBatch<String, String> payload) {
    Multi.createFrom().iterable(payload.getRecords())
            .onItem().transform(EventProcessor1::newLogEvent).filter(l -> !l.isEmptyEvent())
            .onItem()
            .transform(this::processEvent).onItem().transformToUniAndConcatenate(this::process2).collect().with(Collectors.counting())
            .subscribe().with(c -> {
                log.info("completed {} events", c);
            });

   return payload.ack().whenCompleteAsync((s, e) -> {
        if (e != null) {
            throw new RuntimeException(e);
        }
        log.debug("batch processed & comitted successfully");
    });
            });
} 

Upvotes: 0

Views: 1482

Answers (1)

Ladicek
Ladicek

Reputation: 6577

You can configure the number of partitions that should be consumed concurrently. The configuration looks like this:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.xxx=xxx
mp.messaging.incoming.my-channel.partitions=4

The partitions configuration property defaults to 1, which is what you see.

See https://smallrye.io/smallrye-reactive-messaging/3.18.0/kafka/receiving-kafka-records/#configuration-reference for details.

Upvotes: 1

Related Questions