Reputation: 2912
I use quarkus to process incoming messages from Kafka topic that has 4 partitions. While I ingest multiple messages to this topic I see the quarkus-kafka-consumer-code process the message one by one sequentially.
Below is my logic that process the incoming message. Having the topic with 4 partitions should the processEvent
method handle 4 messages in parallel as I run only one instance of this consumer-app. Am I missing anything?
@Incoming("topic-1")
@Retry(delay = 3, maxRetries = 3, retryOn = { MyException.class })
@NonBlocking
public CompletionStage<Void> processEvent(KafkaRecordBatch<String, String> payload) {
Multi.createFrom().iterable(payload.getRecords())
.onItem().transform(EventProcessor1::newLogEvent).filter(l -> !l.isEmptyEvent())
.onItem()
.transform(this::processEvent).onItem().transformToUniAndConcatenate(this::process2).collect().with(Collectors.counting())
.subscribe().with(c -> {
log.info("completed {} events", c);
});
return payload.ack().whenCompleteAsync((s, e) -> {
if (e != null) {
throw new RuntimeException(e);
}
log.debug("batch processed & comitted successfully");
});
});
}
Upvotes: 0
Views: 1482
Reputation: 6577
You can configure the number of partitions that should be consumed concurrently. The configuration looks like this:
mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.xxx=xxx
mp.messaging.incoming.my-channel.partitions=4
The partitions
configuration property defaults to 1
, which is what you see.
See https://smallrye.io/smallrye-reactive-messaging/3.18.0/kafka/receiving-kafka-records/#configuration-reference for details.
Upvotes: 1