Reputation: 81
I have a code like this:
@Component
public class PostConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}
@Autowired
private PostService postService;
@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
How to do parallel processing of partition(s) topic1 and partition(s) topic2?
Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.
P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.
Thank you in advance.
Upvotes: 1
Views: 2262
Reputation: 174554
You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.
We could consider an alternative strategy with an option to distribute the topics themselves across consumers.
Open a GitHub issue and we'll consider it.
EDIT
You can also change the default partition assignor, for example:
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and
@SpringBootApplication
public class Kgh941Application {
public static void main(String[] args) {
SpringApplication.run(Kgh941Application.class, args);
}
@KafkaListener(id = "kgh941", concurrency = "30",
topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
public void listen(String in) {
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
System.out.println("Hit enter to get assignments");
System.in.read();
MessageListenerContainer container = registry.getListenerContainer("kgh941");
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
container).getPropertyValue("containers");
containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
+ c.getAssignedPartitions()));
};
}
@Bean
public NewTopic topica() {
return new NewTopic("kgh941a", 10, (short) 1);
}
@Bean
public NewTopic topicb() {
return new NewTopic("kgh941b", 10, (short) 1);
}
@Bean
public NewTopic topicc() {
return new NewTopic("kgh941c", 10, (short) 1);
}
@Bean
public NewTopic topicd() {
return new NewTopic("kgh941d", 10, (short) 1);
}
@Bean
public NewTopic topice() {
return new NewTopic("kgh941e", 10, (short) 1);
}
@Bean
public NewTopic topicf() {
return new NewTopic("kgh941f", 10, (short) 1);
}
@Bean
public NewTopic topicg() {
return new NewTopic("kgh941g", 10, (short) 1);
}
@Bean
public NewTopic topich() {
return new NewTopic("kgh941h", 10, (short) 1);
}
}
and
3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]
Upvotes: 3