Reputation: 382
I'm using Kafka Stream API to test some functionality. I have a Stream like :
KStream<String, UnifiedData> stream = builder.stream("topic", Consumed.with(Serdes.String(), new JsonSerde<>(Data.class)));
stream.groupBy((key, value) -> value.getMetadata().getId())
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(1000)))
.count()
.toStream()
.map((key, value) -> {
System.out.println(value);
return KeyValue.pair(key.toString(), value);
});
I found 2 stranges behaviours will producing some data in my topic :
20
as output and not something like 1 2 3....
System.out.println(value)
print the result in my consoleSo, do you think that this behaviour is totally normal ? Or May I have a configuration problem with my kafka ?
I'm using Kafka 1.0.1, Kafka Stream 1.0.1, Java 8 and Spring-Boot
Upvotes: 2
Views: 611
Reputation: 62330
By default, Kafka Streams uses a cache to "dedupliate" consecutive outputs from an aggregation to reduce the downstream load.
You can disable caching globally by setting cache.max.bytes.buffering=0
in your KafkaStreams
config. As an alternative, it's also possible to disable cache per store individually, by passing in a Materialized
parameter into the aggregation operator.
Furthermore, all caches are flushed on commit and the default commit interval is 30 seconds. Thus, it makes sense that you see output after 30 seconds. If you disable caching, commit interval will not have any impact on the behavior any longer.
For more details see: https://kafka.apache.org/documentation/streams/developer-guide/memory-mgmt.html
Upvotes: 3