Reputation: 1302
I want to purge a topic in Kafka. So I set the retention time to 1 seconds using
/opt/kafka/bin/kafka-configs.sh --zookeeper zk.com --entity-type topics --entity-name my_topic --alter --add-config retention.ms=1000
log.retention.check.interval.ms=300000
i.e 5 mins so I wait 7 minutes and then reset the above retention value
/opt/kafka/bin/kafka-configs.sh --zookeeper zk.com --entity-type topics --entity-name my_topic --alter --delete-config retention.ms
How can I know for certain the the topic is indeed purged?
Upvotes: 1
Views: 471
Reputation: 1
A quick Unix script which i use in local, where you actually check the size of topic in disk:
# Kafka broker details
kafka_dir="/path/to/kafka"
kafka_bin="${kafka_dir}/bin"
bootstrap_servers="kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092"
# Function to set retention to 0 for a topic
set_retention_to_zero() {
topic="$1"
${kafka_bin}/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name "$topic" --add-config retention.ms=0
}
# Function to revert retention to previous value for a topic
revert_retention() {
topic="$1"
${kafka_bin}/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name "$topic" --delete-config retention.ms
}
# Function to get data size of a topic
get_data_size() {
topic="$1"
data_size=$(${kafka_bin}/kafka-log-dirs.sh --describe --bootstrap-server "$bootstrap_servers" --topic-list "$topic" | grep "LogDir: " | awk '{print $5}')
echo "$data_size"
}
# Main loop
while true; do
# Set retention to 0 for each topic
while IFS= read -r topic; do
set_retention_to_zero "$topic"
done < topics.txt
# Check data size for each topic and send email alert
while IFS= read -r topic; do
data_size=$(get_data_size "$topic")
done < topics.txt
# Wait for some time before checking again (e.g., every 30 seconds)
sleep 5
# Revert retention to previous value for each topic
while IFS= read -r topic; do
revert_retention "$topic"
done < topics.txt
done
Upvotes: 0
Reputation: 1302
I believe one of the ways to surely know that the topic is indeed purged is to read it using the option --from-beginning
. For e.g
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server broker.com:9093 --topic my_topic --from-beginning
If it doesn't return any output, then we can be sure it's purged.
Upvotes: 1
Reputation: 1218
Another way to do this is to use the GetOffsetShell
tool. To get the start offset of each partition in the topic run
bin/kafka-run-class.sh kafka.tools.GetOffsetShell -broker-list localhost:9092 --topic <topic> --time -2
and to get the end offsets run
bin/kafka-run-class.sh kafka.tools.GetOffsetShell -broker-list localhost:9092 --topic <topic> --time -1
If reported offsets are equal, the topic is purged.
Upvotes: 1