Reputation: 941
I use kafka streams reduce function and it creates some state store change log kafka internal topic ( like app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog ).
I wanted to set retention bytes and change cleanup policy to delete to prevent the storage being full. So I set following configs in kafka streams code:
Properties props = new Properties();
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
However, when a new topic is generated, only the retention config is applied to the newly generated internal topic and cleanup policy remains compact.
Is there any missing step to do so ? ( or Isn't it possible to set internal topics cleanup policy to delete ?)
I use kafka version 1.0.0 and kafka-streams version 1.0.0
Upvotes: 6
Views: 3263
Reputation: 941
Thanks Guozhang for his answer in kafka mailing list:
The issue you described seems like an old bug that is resolved since 1.1.0 (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150).
... You do not need to upgrade broker in order to use newer Streams library versions.
Upgrading kafka-streams version to 1.1.0 fixed the issue.
Upvotes: 5