Reputation: 805
We have Springboot application that uses Spring-Kafka (2.1.7). We have enabled concurrency, so we can have one consumer thread per partition. So currently, if we have 3 topics, each with 2 partitions, there will be 2 consumer threads as shown below:
ConsumerThread1 - [topic1-0, topic2-0, topic3-0]
ConsumerThread2 - [topic1-1, topic2-1, topic3-1]
However, instead of a one KafkaListener (or consumer thread) per partition, we would like to have one consumer thread per topic. For example:
ConsumerThread1 - [topic1-0, topic1-1]
ConsumerThread2 - [topic2-0, topic2-1]
ConsumerThread3 - [topic3-0, topic3-1]
If that is not possible, even the following setup is fine:
ConsumerThread1 - [topic1-0]
ConsumerThread2 - [topic1-1]
ConsumerThread3 - [topic2-0]
ConsumerThread4 - [topic2-1]
ConsumerThread5 - [topic3-0]
ConsumerThread6 - [topic3-1]
The catch is that we do not know the complete list of topics before hand (we are using the wildcard topic pattern). A new topic can be added at any time, and a new consumer thread (or threads) should be created for this new topic dynamically during run-time.
Is there any way this can be achieved?
Upvotes: 4
Views: 5312
Reputation: 805
Thanks to suggestions by @Gary Russel, I was able to come up with the following solution which creates a @KafkaListener
bean instance (or consumer thread) per Kafka topic. This way, if there is an issue with messages belonging to a particular topic, it will not affect the processing of other topics.
Note - The following code throws a InstanceAlreadyExistsException
exception during startup. However, this does not seem to affect the functionality. Using the log outputs I'm able to verify that there is one bean instance (or thread) per topic, and they are able to process messages.
@SpringBootApplication
@EnableScheduling
@Slf4j
public class KafkaConsumerApp {
public static void main(String[] args) {
log.info("Starting spring boot KafkaConsumerApp..");
SpringApplication.run(KafkaConsumerApp.class, args);
}
}
@EnableKafka
@Configuration
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
@Value("${kafka.brokers:localhost:9092}")
private String bootstrapServer;
@Value("${kafka.consumerClientId}")
private String consumerClientId;
@Value("${kafka.consumerGroupId}")
private String consumerGroupId;
@Value("${kafka.topicMonitorClientId}")
private String topicMonitorClientId;
@Value("${kafka.topicMonitorGroupId}")
private String topicMonitorGroupId;
@Autowired
private ConfigurableApplicationContext context;
@Autowired
public KafkaConfiguration( KafkaProperties kafkaProperties ) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory( consumerClientId, consumerGroupId ) );
factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> topicMonitorContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory( topicMonitorClientId, topicMonitorGroupId ) );
factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
factory.getContainerProperties().setConsumerRebalanceListener( new KafkaRebalanceListener( context ) );
return factory;
}
private ConsumerFactory<String, String> consumerFactory( String clientId, String groupId ) {
Map<String, Object> config = new HashMap<>();
config.putAll( kafkaProperties.buildConsumerProperties() );
config.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer );
config.put( ConsumerConfig.CLIENT_ID_CONFIG, clientId );
config.put( ConsumerConfig.GROUP_ID_CONFIG, groupId );
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); // needs to be turned off for rebalancing during topic addition and deletion
// check -> https://stackoverflow.com/questions/56264681/is-it-possible-to-have-one-kafka-consumer-thread-per-topic/56274988?noredirect=1#comment99401765_56274988
return new DefaultKafkaConsumerFactory<>( config, new StringDeserializer(), new StringDeserializer() );
}
}
@Configuration
public class KafkaListenerConfiguration {
@Bean
@Scope("prototype")
public KafkaMessageListener kafkaMessageListener() {
return new KafkaMessageListener();
}
}
@Slf4j
public class KafkaMessageListener {
/*
* This is the actual message listener that will process messages. It will be instantiated per topic.
*/
@KafkaListener( topics = "${topic}", containerFactory = "kafkaListenerContainerFactory" )
public void receiveHyperscalerMessage( ConsumerRecord<String, String> record, Acknowledgment acknowledgment, Consumer<String, String> consumer ) {
log.debug("Kafka message - ThreadName={}, Hashcode={}, Partition={}, Topic={}, Value={}",
Thread.currentThread().getName(), Thread.currentThread().hashCode(), record.partition(), record.topic(), record.value() );
// do processing
// this is just a sample acknowledgment. it can be optimized to acknowledge after processing a batch of messages.
acknowledgment.acknowledge();
}
}
@Service
public class KafkaTopicMonitor {
/*
* The main purpose of this listener is to detect the rebalance events on our topic pattern, so that
* we can create a listener bean instance (consumer thread) per topic.
*
* Note that we use the wildcard topic pattern here.
*/
@KafkaListener( topicPattern = ".*abc.def.ghi", containerFactory = "topicMonitorContainerFactory" )
public void monitorTopics( ConsumerRecord<String, String> record ) {
// do nothing
}
}
@Slf4j
public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener {
private static final ConcurrentMap<String, KafkaMessageListener> listenerMap = new ConcurrentHashMap<>();
private final ConfigurableApplicationContext context;
public KafkaRebalanceListener( ConfigurableApplicationContext context ) {
this.context = context;
}
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// do nothing
}
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// do nothing
}
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info("OnPartitionsAssigned - partitions={} - {}", partitions.size(), partitions);
Properties props = new Properties();
context.getEnvironment().getPropertySources().addLast( new PropertiesPropertySource("topics", props) );
for( TopicPartition tp: partitions ) {
listenerMap.computeIfAbsent( tp.topic(), key -> {
log.info("Creating messageListener bean instance for topic - {}", key );
props.put( "topic", key );
// create new KafkaMessageListener bean instance
return context.getBean( "kafkaMessageListener", KafkaMessageListener.class );
});
}
}
}
Upvotes: 5
Reputation: 174484
You can use a custom Partitioner to allocate the partitions however you want. It's a kafka consumer property.
EDIT
See this answer.
It is for a @JmsListener
but the same technique can be applied to kafka too.
Upvotes: 1
Reputation: 39978
You can create separate containers for each topic from spring-kafka:2.2 and set concurrency 1, so that each containers will consume from each topic
Starting with version 2.2, you can use the same factory to create any ConcurrentMessageListenerContainer. This might be useful if you want to create several containers with similar properties or you wish to use some externally configured factory, such as the one provided by Spring Boot auto-configuration. Once the container is created, you can further modify its properties, many of which are set by using container.getContainerProperties(). The following example configures a ConcurrentMessageListenerContainer:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
Note : Containers created this way are not added to the endpoint registry. They should be created as @Bean definitions so that they are registered with the application context.
Upvotes: 5