posthumecaver
posthumecaver

Reputation: 1843

Kafka Streams KeyValueStore retention.bytes

I have a funny behaviour with KeyValueStore and I have some assumption to explain it, may be you can tell I am right or wrong...

I configured a state store like the following

Map<String, String> storeConfig = new HashMap<>();
storeConfig.put(TopicConfig.RETENTION_MS_CONFIG, TimeUnit.DAYS.toMillis(30));
storeConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");

StoreBuilder store1 = Stores.keyValueStoreBuilder(
   Stores.persistentKeyValueStore("STORE1"),
   Serdes.String(),
   Serdes.String()
);

streamsBuilder.addStateStore(store1.withLoggingEnabled(storeConfig));

with this configuration, I am expecting a dataset older then 30 Days will disappear but I am observing something completely different.

When I look to the rockdb directory of the store, every 14451 bytes it rolls the file and I have a such structure in the directory

14451  1. Oct 19:00 LOG
14181 30. Sep 15:59 LOG.old.1569854012833395
14451 30. Sep 17:40 LOG.old.1569918431235734
14451  1. Oct 11:05 LOG.old.1569949239434224

It seems instead of realising retention over 30 days that is configured it also realises over the file size.

I found on the internet that there is also the parameter Topic.RETENTION_BYTES_CONFIG 'retention.bytes', do I also have to configure this parameter, so my Data is visible during the retention and not deleted because of the file size (I know I have value for my key but I can't access it after this phenomena occurs)...

Thx for answers..

Upvotes: 0

Views: 1571

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

Internally, KeyValueStores use RocksDB, and RocksDB uses a so-called LSM-Tree (Log-Structured-Merged-Tree) internally, and it creates many smaller segments that are later combined into larger segments. After this "compaction" step, the smaller segment files can be deleted, because the data is copied into larger segment files. Hence, there is nothing to worry about.

Furthermore, Topic.RETENTION_MS_CONFIG is a topic configuration and is not related to the local store of a Kafka Streams application. Furthermore, a KeyValueStore will retain data forever, until explicitly deleted via a "tombstone" message. Hence, if you set a retention time for the underlying changelog topic, the data might be deleted in the topic, but not in the local store.

If you want to apply a retention time to a local store, you cannot use a KevValueStore, but you could use WindowedStore that supports a retention time.

Upvotes: 1

Related Questions