Reputation: 4638
Using spring-kafka 1.0.5, I am consuming from a busy topic with 10 partitions with a concurrency of 10.
My current code adds a message to a queue based on the partition ID which are both persisted in a HashMap.
@KafkaListener(topics = "${kafka.topic}")
public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
//Pseudo code
add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition.
}
Unfortunately, that design is taking twice the processing time a simple consumption would take.
My requirement is to process partitions separately but how can avoid having a hashmap with a reference to a partition based on the @KafkaListener.
Is there a more efficient way of going about this? Ideally, each thread from the listener annotation would manage its own list. Is there a way to do that without having a cross reference such as the hashmap mentioned above based on the partition ID?
Upvotes: 1
Views: 491
Reputation: 121560
Consider to declare several @KafkaListener
methods for each of your required partition. For this purpose you should use topicPartitions
attribute instead of topics
:
/**
* Used to add topic/partition information to a {@code KafkaListener}.
*
*/
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicPartition {
Upvotes: 1