Reputation: 35
I use Kafka to join two streams with 3 days join window:
...
private final long retentionHours = Duration.ofDays(3);
...
var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
.grace(Duration.ofMillis(0));
var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
.withStoreName("STORE-1")
.withName("STORE-2");
stream1.join(stream2, streamJoiner(), joinWindow, joinStores);
With above implementation, I found that Kafka created state folder: /tmp/kafka-streams, (looks like RocksDB) and it grows constantly. Also, state store in Kafka cluster grows constantly.
So, I changed streams join implementation to:
...
private final long retentionHours = Duration.ofDays(3);
...
var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
.grace(Duration.ofMillis(0));
var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
.withStoreName("STORE-1")
.withName("STORE-2")
.withThisStoreSupplier(createStoreSupplier("MEM-STORE-1"))
.withOtherStoreSupplier(createStoreSupplier("MEM-STORE-2"));
stream1.join(stream2, streamJoiner(), joinWindow, joinStores);
...
private WindowBytesStoreSupplier createStoreSupplier(String storeName) {
var window = Duration.ofMinutes(retentionHours * 2)
.toMillis();
return new InMemoryWindowBytesStoreSupplier(storeName, window, window, true);
}
Now, there is no state folder: /tmp/kafka-streams.
Does it mean that InMemoryWindowBytesStoreSupplier doesn't use disk at all? If yes, how does it work?
Also, I still see that state store in Kafka cluster grows constantly.
Upvotes: 1
Views: 1059
Reputation: 15057
Does it mean that InMemoryWindowBytesStoreSupplier doesn't use disk at all? If yes, how does it work?
IIRC, InMemoryWindowBytesStore
doesn't use disk at all.
Generally speaking, a logical state store is in fact partitioned into multiple state store 'instances' (think: each stream task has its own, local state store instance). For the InMemoryWindowBytesStore
specifically, and by design, these store instances manage all their local data in memory.
Also, I still see that state store in Kafka cluster grows constantly.
However, the InMemoryWindowBytesStore
is still fault-tolerant. This is often confusing for new Kafka Streams developers because, in most software, "in memory" always implies "data is lost if something happens". This is not the case with Kafka Streams, however. A state store is always 'backed up' durably to its Kafka changelog topic, regardless of whether you use the default state store (with RocksDB) or the in-memory state store. This explains why you see the in-memory state's (changelog) data in the Kafka cluster. The data should not grow forever, btw, as changelog topics are compacted to prevent exactly this scenario.
Note: What can happen, however, when using the in-memory store is that your application instances could run out of memory (OOM), and thus crash. While your state data will never be lost, as explained above, your application will not be running due to the OOM crash / it will run only partially (some app instances run OOM, others do not). This OOM problem doesn't apply to the default store (RocksDB), as it manages its data on disk, and uses memory (RAM) only for caching purposes. But, again, this question of app availability is orthogonal to data safety (your data is safe regardless of whether your app is crashing or not).
Upvotes: 1