Hobo Joe
Hobo Joe

Reputation: 155

Expired Apache Kafka messages not getting deleted

I have a spring-boot (2.1.3) service publishing messages to a kafka(2.12-2.3.0) topic. The service creates the topic and later, after the service is up, sets the retention.ms to 1 second.

Currently debugging this code

@SpringBootApplication()
@EnableAsync
public class MetricsMsApplication {

  public static void main(String[] args) {
    SpringApplication.run(MetricsMsApplication.class, args);
}

@Bean
public NewTopic topic1() {

    NewTopic nt = new NewTopic("metrics", 10, (short) 1);
    return nt;
}

@EventListener(ApplicationReadyEvent.class)
private void init() throws ExecutionException, InterruptedException {
    Map<String, Object> config = new HashMap<>();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

    AdminClient client = AdminClient.create(config);

    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "metrics");

    // Update the retention.ms value
    ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000");
    Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
    updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
    AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
    alterConfigsResult.all();
}

}

I send in a couple of messages and count to 5, then start a console consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic admst-metrics --from-beginning

and still get the messages that should have expired.

The kafka logs show the retention.ms config was applied. I added cleanup.policy and set it to delete, but that shouldn't be necessary as it's the default.

What will make these messages get deleted?

Upvotes: 3

Views: 3029

Answers (1)

radai
radai

Reputation: 24192

The short answer - kafka wasn't designed to honor such low retention values.

The longer answer:

Kafka stores data for any (topic) partition in segment files. at any time a single segment is "active" and being written into while all the older segments are "closed". Retention/compaction is only applied to non-active segments.

Kafka rolls new segments when either log.roll.ms or log.segment.bytes is hit. The defaults (see https://kafka.apache.org/documentation/#brokerconfigs) are 7 days and/or ~1GB.

There's also log.segment.delete.delay.ms which by default means any segment is retained for at least a minute.

The work of compacting/deleting non-active segments is done by log cleaner threads. Those sleep for log.cleaner.backoff.ms (15 seconds) when no work is found and only check if any particular segment can be cleaned every log.retention.check.interval.ms (5 minutes)

The result of all this is that retention values anywhere near what you're looking for are not possible by default.

You could try tweaking all the above values, and see how low you can go, but I'm betting this won't scale well for a large number of topics.

Upvotes: 5

Related Questions