Reputation: 10375
#!/bin/zsh
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16')
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092')
topics=('t1' 't2' 't1_failed' 't2_failed')
NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)
function red() {
echo -e "$RED$*$NORMAL"
}
function green() {
echo -e "$GREEN$*$NORMAL"
}
function yellow() {
echo -e "$YELLOW$*$NORMAL"
}
for topic in $topics; do
yellow "Cleaning up messages in topic @ " $topic
yellow "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
done
red "Waiting 120 seconds for messages to expire"
sleep 120
for topic in $topics; do
green "Restoring config of topic @ " $topic
green "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
$KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic
done
When I run this script - I can see that config.retention.ms has changed to 100ms, but after 120 seconds delay - I still see the same messages in all kafka topics.
So how do I purge the messages?
thanks, Dmitry
Upvotes: 3
Views: 3707
Reputation: 2313
You have to wait for log.retention.check.interval.ms
which defaults to 5 minutes.
Upvotes: 10
Reputation: 2145
There is a bit more to it than the accepted answer. Kafka stores messages in log files on the file system. Those files have a rollover (configured by time or size). Once a file is no longer the current file, Kafka will no longer append to that file.
Now for the fun part: Kafka will not expire individual messages. It will (for non-compacted topics) delete whole log files once the highest timestamp of a message in that file is older than retention.ms
. The retention time tells you that messages will be available at least this long, but it can potentially be available much much longer (depending on roll over configuration and message volume).
In older Kafka version, this is not based on the message timestamp but write access to the log file. Thanks to @dawsaw for pointing this out.
Upvotes: 11