Reputation: 155
I have a spring-boot (2.1.3) service publishing messages to a kafka(2.12-2.3.0) topic. The service creates the topic and later, after the service is up, sets the retention.ms to 1 second.
@SpringBootApplication()
@EnableAsync
public class MetricsMsApplication {
public static void main(String[] args) {
SpringApplication.run(MetricsMsApplication.class, args);
}
@Bean
public NewTopic topic1() {
NewTopic nt = new NewTopic("metrics", 10, (short) 1);
return nt;
}
@EventListener(ApplicationReadyEvent.class)
private void init() throws ExecutionException, InterruptedException {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "metrics");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();
}
}
I send in a couple of messages and count to 5, then start a console consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic admst-metrics --from-beginning
and still get the messages that should have expired.
The kafka logs show the retention.ms config was applied. I added cleanup.policy and set it to delete, but that shouldn't be necessary as it's the default.
What will make these messages get deleted?
Upvotes: 3
Views: 3029
Reputation: 24192
The short answer - kafka wasn't designed to honor such low retention values.
The longer answer:
Kafka stores data for any (topic) partition in segment files. at any time a single segment is "active" and being written into while all the older segments are "closed". Retention/compaction is only applied to non-active segments.
Kafka rolls new segments when either log.roll.ms or log.segment.bytes is hit. The defaults (see https://kafka.apache.org/documentation/#brokerconfigs) are 7 days and/or ~1GB.
There's also log.segment.delete.delay.ms which by default means any segment is retained for at least a minute.
The work of compacting/deleting non-active segments is done by log cleaner threads. Those sleep for log.cleaner.backoff.ms (15 seconds) when no work is found and only check if any particular segment can be cleaned every log.retention.check.interval.ms (5 minutes)
The result of all this is that retention values anywhere near what you're looking for are not possible by default.
You could try tweaking all the above values, and see how low you can go, but I'm betting this won't scale well for a large number of topics.
Upvotes: 5