Amir Masud Zare Bidaki
Amir Masud Zare Bidaki

Reputation: 941

Kafka-streams: setting internal topics cleanup policy to delete doesn't work

I use kafka streams reduce function and it creates some state store change log kafka internal topic ( like app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog ).

I wanted to set retention bytes and change cleanup policy to delete to prevent the storage being full. So I set following configs in kafka streams code:

Properties props = new Properties();
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
KafkaStreams streams = new KafkaStreams(builder.build(), props);

However, when a new topic is generated, only the retention config is applied to the newly generated internal topic and cleanup policy remains compact.

Is there any missing step to do so ? ( or Isn't it possible to set internal topics cleanup policy to delete ?)

I use kafka version 1.0.0 and kafka-streams version 1.0.0

Upvotes: 6

Views: 3263

Answers (1)

Amir Masud Zare Bidaki
Amir Masud Zare Bidaki

Reputation: 941

Thanks Guozhang for his answer in kafka mailing list:

The issue you described seems like an old bug that is resolved since 1.1.0 (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150).

... You do not need to upgrade broker in order to use newer Streams library versions.

Upgrading kafka-streams version to 1.1.0 fixed the issue.

Upvotes: 5

Related Questions