ltalhouarne
ltalhouarne

Reputation: 4638

Spring-kafka high volume processing

Using spring-kafka 1.0.5, I am consuming from a busy topic with 10 partitions with a concurrency of 10.

My current code adds a message to a queue based on the partition ID which are both persisted in a HashMap.

@KafkaListener(topics = "${kafka.topic}")
public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
    //Pseudo code
    add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition.
}

Unfortunately, that design is taking twice the processing time a simple consumption would take.

My requirement is to process partitions separately but how can avoid having a hashmap with a reference to a partition based on the @KafkaListener.

Is there a more efficient way of going about this? Ideally, each thread from the listener annotation would manage its own list. Is there a way to do that without having a cross reference such as the hashmap mentioned above based on the partition ID?

Upvotes: 1

Views: 491

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

Consider to declare several @KafkaListener methods for each of your required partition. For this purpose you should use topicPartitions attribute instead of topics:

/**
 * Used to add topic/partition information to a {@code KafkaListener}.
 *
 */
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicPartition {

Upvotes: 1

Related Questions