Reputation: 521
I'm using KafkaStreams 0.10.2.1 with a Windowed RocksDB state store, and I'm seeing a very weird behavior during state store initialization. Inside each task's state store folder KafkaStreams is creating and deleting folders containing RocksDB files for 30 minutes.
If the state store is named XXX, then I see folders being created inside a folder named
State Folder/Task ID/XXX
with names such as
XXX-201710211345
containing RocksDB files. These folders are created, then deleted and new folders with a different timestamp are created. This goes on for 30 minutes until message processing ensues. I'm guessing that RocksDB is reconstructing from the change log topic of the state store all the historical states, but I fail to understand for what purpose, as it eventually deletes all but the last one.
What is the reason that KafkaStreams is creating and deleting these folders?
How can I make KafkaStreams recreate only the latest state?
This is a stripped down version of my topology:
stream
.map((key, value) -> KeyValue.pair(key, value))
.through(Serdes.String(), serde, MY_TOPIC)
.groupByKey(Serdes.String(), serde)
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(windowDurationSec)).until(TimeUnit.SECONDS.toMillis(windowDurationSec) + TimeUnit.SECONDS.toMillis(lateEventGraceTimeSec)), "Hourly_Agg")
.foreach((k, v) -> System.out.println(""));
And here's a (tiny part of) dump from strace:
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst", {st_mode=S_IFREG|0644, st_size=3158, ...}) = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst") = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = -1 EISDIR (Is a directory)
6552 rmdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = 0
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
6552 mkdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500", 0755) = 0
6552 rename("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG", "/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG.old.1508746634575191") = -1 ENOENT (No such file or directory)
Upvotes: 2
Views: 1030
Reputation: 62330
Kafka Streams does recreate the latest state and the behavior you see is by design.
For windowed stores, the window retention time period is divided into so-called segments and Streams use one RocksDB per segment to store the corresponding data. This allow to "roll" segments based on time progress and delete data that is older than retention time efficiently (ie, drop a hole segment/RocksDB).
When state is recreated, we simply read the whole changelog topic and apply all those updates to the store. Thus, you see the same segment rolling behavior as during processing (just in a much smaller time frame). It's not easily possible, to "jump" to the last state as there is not enough information upfront -- thus, blindly replaying the changelog is the best option.
Upvotes: 3