DmitrySemenov
DmitrySemenov

Reputation: 10375

Kafka's retention.ms is not being enforced with Kafka 0.10.2?

#!/bin/zsh
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16')
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092')
topics=('t1' 't2' 't1_failed' 't2_failed')


NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)

function red() {
    echo -e "$RED$*$NORMAL"
}

function green() {
    echo -e "$GREEN$*$NORMAL"
}

function yellow() {
    echo -e "$YELLOW$*$NORMAL"
}



for topic in $topics; do
   yellow "Cleaning up messages in topic @ " $topic
   yellow "=============================================================="
   $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100
   $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
done

red "Waiting 120 seconds for messages to expire"
sleep 120

for topic in $topics; do 
   green "Restoring config of topic @ " $topic                                                                  
   green "=============================================================="
   $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms                
   $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
   $KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic
done

When I run this script - I can see that config.retention.ms has changed to 100ms, but after 120 seconds delay - I still see the same messages in all kafka topics.

So how do I purge the messages?

thanks, Dmitry

Upvotes: 3

Views: 3707

Answers (2)

dawsaw
dawsaw

Reputation: 2313

You have to wait for log.retention.check.interval.ms which defaults to 5 minutes.

Upvotes: 10

ftr
ftr

Reputation: 2145

There is a bit more to it than the accepted answer. Kafka stores messages in log files on the file system. Those files have a rollover (configured by time or size). Once a file is no longer the current file, Kafka will no longer append to that file.

Now for the fun part: Kafka will not expire individual messages. It will (for non-compacted topics) delete whole log files once the highest timestamp of a message in that file is older than retention.ms. The retention time tells you that messages will be available at least this long, but it can potentially be available much much longer (depending on roll over configuration and message volume).

In older Kafka version, this is not based on the message timestamp but write access to the log file. Thanks to @dawsaw for pointing this out.

Upvotes: 11

Related Questions