Wizard
Wizard

Reputation: 1184

When does kafka stream app clean it state store?

I have a kafka streams app which is currently just joining two KStreams with a 5-minute window and writing the join result to another topic.

Since I am joining two topics over a time window, my app will have state associated with it. I was under the impression that the state stores in my app would get pruned after every the 5-minute window (Because my app cares only about the 5 min window of events for the join state).

I was expecting a constant disk-space utilization. But, seems like that is not the case. Its been 12 hrs and I do not see that the state store is getting cleaned up. It's consistently growing.

So I have multiple concerns on this now,

  1. When does Kafka Streams app clean up its state?
  2. If one of my app in the kafka streams app cluster fails, and I boot another host and make it join the cluster, after rebalancing, is there orphaned state store sitting in the disk for the partitions that got rebalanced?
  3. My understanding is that the events are joined only if they happen in the defined window, so, why does kafka need to hold on to data that is older than the defined window period in its state store?

Let me know if you need any other information from me regarding my streams app. I am currently running kafka-streams version 2.2.1 and my brokers are also on the same version.

Upvotes: 2

Views: 2150

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

When does Kafka Streams app clean up its state?

The size of the state depends on the retention period, that is 1 day by default.

Atm, it's not possible to change the retention period for KStream-KStream joins -- it's already WIP to add this feature: https://issues.apache.org/jira/browse/KAFKA-8558

If one of my app in the kafka streams app cluster fails, and I boot another host and make it join the cluster, after rebalancing, is there orphaned state store sitting in the disk for the partitions that got rebalanced?

Yes. However, this state will be cleaned (if you restart Kafka Streams on the recovered host) if the state is not reused after a configurable (state.cleanup.delay.ms) period of time.

My understanding is that the events are joined only if they happen in the defined window, so, why does kafka need to hold on to data that is older than the defined window period in its state store?

Having a higher retention period that your window size allows Kafka Streams to process out-of-order data. Note that Kafka Streams uses event time semantics, not processing time semantics.

Upvotes: 3

Related Questions