Anirudh
Anirudh

Reputation: 2326

Kafka topic record retention policies not clear

From Kafka Docs I got interested and tried the following 2 retention types together

log.retention.bytes:

The maximum size of the log before deleting it Type: longDefault: -1Valid Values:Importance: highUpdate Mode: cluster-wide

log.retention.ms

The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied. Type: longDefault: nullValid Values:Importance: highUpdate Mode: cluster-wide

AS

  1. log.retention.bytes = 1Gb
  2. log.retention.ms = 7 days

Problem Situation

I have currently on my topic all messages belonging two different log files both of which are < 1GB

Lets say log.1 files has 400 MB of messages with oldest message > 7 days old.

which is on the top of

log.2 file has 500 MB with newest message > 7 days old.

I understand kafka would clean up all records belonging to log.2 file in other words remove this log from the topic.

What happens to the records in the log.1 which are older than 7 days?

Upvotes: 0

Views: 5193

Answers (2)

Kumar Rohit
Kumar Rohit

Reputation: 507

There are two properties which defines message retention in Kafka - log.retention.bytes and log.retention.ms (per topic per partition level). The strategy for data removal works on FIFO basic, i.e., the message which was pushed to a topic first would be deleted first.

You have rightly said that the default values for the same are:

log.retention.bytes = 1Gb (per topic per partition)
log.retention.ms = 7 days (per topic)

It means that whichever limit is breached first, would lead to data purge in Kafka.

For example, let's assume that the size of messages in your topic takes 500 MB of space (which is less than log.retention.bytes) but older than 7 days (i.e. greater than the default log.retention.ms). In this case the data older than 7 days would be purged (on FIFO basis).

Likewise, if, for a given topic, the space occupied by the messages exceeds the log.retention.bytes but are not older than log.retention.ms, in this case too, the data would be purged (on FIFO basis).

Concept of making data expire is called as Cleanup & the messages on a topic are not immediately removed after they are consumed/expired. What happens in the background is, once either of the limit is breached, the messages are marked deleted. There are 3 logs cleanup policies in Kafka - DELETE (default), COMPACT, DELETE AND COMPACT. Kafka Log Cleaner does log compaction, a pool of background compaction threads.

To turn on compaction for a topic use topic config log.cleanup.policy=compact. To set delay to start compacting records after they are written use topic config log.cleaner.min.compaction.lag.ms. Records won’t get compacted until after this period. The setting gives consumers time to get every record. This could be reason that older messages are not getting deleted immediately. You can check the value of property for compaction delay.

Below links might be helpful:

Upvotes: 2

Bitswazsky
Bitswazsky

Reputation: 4708

I'm paraphrasing here, from the relevant section of a book, Kafka - Definitive Guide. It'll most likely clear your doubt.

log.retention.bytes : This denotes the total number of bytes of messages retained per partition. So, if we have a topic with 8 partitions, and log.retention.bytes is set to 1GB, then the amount of data retained for the topic will be 8GB at most. This means if we ever choose to increase the number of partitions for a topic, total amount of data retained will also increase.

log.retention.ms : The most common configuration for how long Kafka will retain messages is by time. The default is specified in the configuration file using the log.retention.hours parameter, and it is set to 168 hours, or one week. However, there are two other parameters allowed, log.retention.minutes and log.retention.ms. All three of these specify the same configuration—the amount of time after which messages may be deleted—but the recommended parameter to use is log.retention.ms, as the smaller unit size will take precedence if more than one is specified. This will make sure that the value set for log.retention.ms is always the one used. If more than one is specified, the smaller unit size will take precedence.

Retention By Time and Last Modified Times : Retention by time is performed by examining the last modified time (mtime) on each log segment file on disk. Under normal cluster operations, this is the time that the log segment was closed, and represents the timestamp of the last message in the file. However, when using administrative tools to move partitions between brokers, this time is not accurate and will result in excess retention for these partitions.

Configuring Retention by Size and Time : If you have specified a value for both log.retention.bytes and log.retention.ms (or another parameter for retention by time), messages may be removed when either criteria is met. For example, if log.retention.ms is set to 86400000 (1 day) and log.retention.bytes is set to 1000000000 (1 GB), it is possible for messages that are less than 1 day old to get deleted if the total volume of messages over the course of the day is greater than 1 GB. Conversely, if the volume is less than 1 GB, messages can be deleted after 1 day even if the total size of the partition is less than 1 GB.

Upvotes: 1

Related Questions