Reputation: 157
I'm running a Kafka streams app with windowed function. But after 24 hours running, local disc usage increased from 5G to 20G and keeps increasing. From what I googled, once I introduced windowedBy
, it should remove old data automatically.
My topology looks like below:
stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()
One thing I can't understand is, from this topology, it will create two internal repartition topics, as repartition-03
and repartition-14
for two groupBy
actions. From the disc, all machines which are taking repartition-03
tasks are having high disc usage and seems never removing old data while machines which are running repartition-14
tasks are always under low disc usage.
When I log in to the machines, I found different path for those two machines as below:
/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000
Why they are having different path? 2_40
is for the repartition-14
tasks and it has rocksdb
in the path while the other doesn't contain rocksdb
. Meanwhile, taks 1_4
keeps couple folders like KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000
but with different suffix.
I though once I introduced windowedBy function, rocksdb will remove old data when window is expired? And why above two internal repartition topics have different path and retention behavior?
Any help is highly appreciated! Thanks!
Upvotes: 1
Views: 1540
Reputation: 62330
Default retention period is 24h. You can reduce it via
.reduce(..., Materialized.with(...).withRetention(...));
Upvotes: 4