Reputation: 6624
I have 1 kafka topic with only 1 partition.
At any point of time, there can be multiple kafka clients. All of the clients are subscribed using same consumer group. So at any given point of time only 1 client would be receiving messages. Let's say from t0 to t10 consumer1 was getting messages, but after some time it stops getting message and consumer2 is elected new leader(may be because of GC pauses in consumer1). In my consumer1, i want to detect whenever this failover happens, so that it can flush its local state.
Is it possible to do with kafka client code?
Upvotes: 0
Views: 241
Reputation: 1743
It is possible using onPartitionsRevoked
callback method available in ConsumerRebalanceListener
interface.
From description,
A callback method the user can implement to provide handling of offset commits to a customized store. This method will be called during a rebalance operation when the consumer has to give up some partitions. It can also be called when consumer is being closed or is unsubscribing. It is recommended that offsets should be committed in this callback to either Kafka or a custom offset store to prevent duplicate data.
Example Implementation:
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
Example:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class TestConsumer {
KafkaConsumer<String, String> kafkaConsumer;
public static void main(String[] args) {
TestConsumer consumer = new TestConsumer();
consumer.pollMessages();
}
public TestConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
}
public void pollMessages() {
while(true) {
System.out.println("Polling");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
System.out.println(records.count());
}
}
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
}
Out:
Poll
Partitions assignment listener: [input-topic-0]
0
Poll
0
Poll
Partitions revoke listener: [input-topic-0]
Partitions assignment listener: [input-topic-0]
0
Poll
0
Upvotes: 1