Reputation: 137
My KAFKA Stream java application goes to ERROR status due to an out of memory problem. I use windowed aggregation, mainly in order to calculate median values:
.windowedBy(TimeWindows.of(Duration.ofSeconds(1)).advanceBy(Duration.ofMillis(999)).grace(Duration.ofMillis(1)))
with .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withMaxBytes(10000).withLoggingDisabled()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(2)).grace(Duration.ofMillis(1)))
I have also a steate store:`
StoreBuilder<KeyValueStore<String, Gateway>> kvStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(AppConfigs.GW_STATE_STORE_NAME),
Serdes.String(),
JsonSerdes.getGatewaySerde()
);
// add state store to StreamBuilder
builder.addStateStore(kvStoreBuilder);`
Eclipse memory analyzer says that:
One instance of ‘org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer’ loaded by ‘jdk.internal.loader.ClassLoaders$AppClassLoader @ 0xf00d8558’ occupies 238,753,712 (90.51%) bytes. The memory is accumulated in one instance of ‘java.util.HashMap$Node[]’, loaded by ‘’, which occupies 238,749,768 (90.51%) bytes.
Can anyone explain which should be the root cause ?
Upvotes: 0
Views: 3516
Reputation: 62350
The error is from suppress()
that use the in-memory store (InMemoryTimeOrderedKeyValueBuffer
). suppress()
does not support RocksDB atm (cf https://issues.apache.org/jira/browse/KAFKA-7224).
Your suppress()
config seems to be incorrect:
Suppressed.BufferConfig.unbounded().withMaxBytes(10000).withLoggingDisabled()
The configs unbounded()
and withMaxBytes()
contradict each other: do you want an unbounded or bounded buffer? -- In your case, the second withMaxBytes()
overwrites the first one. Thus, you only provide 10,000
bytes for the suppress buffer. Because you use untilWindowCloses()
, Kafka Streams will need to shut down if it runs out of memory, because it's neither allows to early emit (untilWindowClose()
) not allowed to use more memory (withMaxBytes(...)
).
For untilWindowClose()
you should use unbounded()
. If you want to bound memory, you should not use untilWindowClose()
.
Upvotes: 4
Reputation: 920
You need to tune the rocks DB configuration, please read this https://medium.com/@grinfeld_433/kafka-streams-and-rocksdb-in-the-space-time-continuum-and-a-little-bit-of-configuration-40edb5ee9ed7
If you are using java>=8, set metaspace otherwise it will eat all your server RAM. http://karunsubramanian.com/websphere/one-important-change-in-memory-management-in-java-8/
If using dockers, limit the max memory configs.
There is a bug in old kafka and they recommend to update version. https://issues.apache.org/jira/browse/KAFKA-8637
Upvotes: 0