Swapnil Dixit
Swapnil Dixit

Reputation: 101

Kafka streams application is still trying to create changelog topic though i am setting the optimization property

Below is the code snippet i am using,

`streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerIP:port"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

StreamBuilder builder = new StreamBuilder(); KStream streamData = builder.stream(inputTopicName);

streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) .aggregate( //some transformation );

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamConfiguration),streamConfiguration); `

i check the confluent page for optimization method and followed the suggested changes. but still it is trying to generate changelogTopic.

Upvotes: 1

Views: 1698

Answers (1)

bbejeck
bbejeck

Reputation: 1370

I understand your confusion here, but the optimization for changelog topics is only for source KTable operations. For example:

KTable<String, String> someTable = builder.table("topic");

With optimizations turned on, then Kafka Streams will use source topic for the KTable as the changelog topic as well.

If you don't want Kafka Streams to create a changelog topic for your aggregation then you'll need to explicitly disable it with Materialized.withLoggingDisabled() config object:

streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) .aggregate(initializer, aggregator, Materialized.as("store-name").withLoggingDisabled();

The above is just one example; you can also use Materialized to configure the key and value Serdes if required. But the bottom line is to prevent changelog topics you need to use Materialized.withLoggingDisabled().

Additionally, you can configure changelog topics using the StoreBuilder. The Javadoc for Stores has a brief example of using a StoreBuilder.

Upvotes: 1

Related Questions