Reputation: 382
I am using processor api to delete messages from state store. Delete is working successfully, i confirmed by using interactive queries call on state store by kafka key, but it does not reduce the kafka streams file size on local disk under directory tmp/kafka-streams.
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka streams directory size
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
Do I need any specific configuration so that it keeps the file size in control? If it doesn't work this way, is it okay to delete kafka-streams directory? I assume it should be safe, since such delete will delete the record from both state store and changelog topic.
Upvotes: 0
Views: 1850
Reputation: 62330
RocksDB does file compaction in the background. Hence, if you need a more aggressive compaction you should pass in a custom RocksDBConfigSetter
via Streams config parameter rocksdb.config.setter
. For more details about RockDB, check out the RocksDB documentation.
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter
However, I would not recommend to change RocksDB configs as long as there is no real issue -- you can do more harm than good. Seems you store size is quite small, thus, I don't see a real problem atm.
Btw: If you go to production, you should change the state.dir
config to an appropriate directory where even after restarting of a machine the state will not be lost. If you put state into the default /tmp
location, state is most likely gone after restarting of the machine and an expensive recovery from the changelog topics would be triggered.
Upvotes: 3