Reputation: 101
Below is the code snippet i am using,
`streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerIP:port"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
StreamBuilder builder = new StreamBuilder(); KStream streamData = builder.stream(inputTopicName);
streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) .aggregate( //some transformation );
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamConfiguration),streamConfiguration); `
i check the confluent page for optimization method and followed the suggested changes. but still it is trying to generate changelogTopic.
Upvotes: 1
Views: 1698
Reputation: 1370
I understand your confusion here, but the optimization for changelog topics is only for source KTable
operations. For example:
KTable<String, String> someTable = builder.table("topic");
With optimizations turned on, then Kafka Streams will use source topic for the KTable
as the changelog topic as well.
If you don't want Kafka Streams to create a changelog topic for your aggregation then you'll need to explicitly disable it with Materialized.withLoggingDisabled()
config object:
streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) .aggregate(initializer, aggregator, Materialized.as("store-name").withLoggingDisabled();
The above is just one example; you can also use Materialized
to configure the key and value Serdes
if required. But the bottom line is to prevent changelog topics you need to use Materialized.withLoggingDisabled()
.
Additionally, you can configure changelog topics using the StoreBuilder
. The Javadoc for Stores
has a brief example of using a StoreBuilder
.
Upvotes: 1