Giulio De Marco
Giulio De Marco

Reputation: 137

KAFKA Stream OOM (Out of Memory)

My KAFKA Stream java application goes to ERROR status due to an out of memory problem. I use windowed aggregation, mainly in order to calculate median values:

I have also a steate store:`

    StoreBuilder<KeyValueStore<String, Gateway>> kvStoreBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(AppConfigs.GW_STATE_STORE_NAME),
            Serdes.String(),
            JsonSerdes.getGatewaySerde()
    );
    // add state store to StreamBuilder
    builder.addStateStore(kvStoreBuilder);`

Eclipse memory analyzer says that:

One instance of ‘org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer’ loaded by ‘jdk.internal.loader.ClassLoaders$AppClassLoader @ 0xf00d8558’ occupies 238,753,712 (90.51%) bytes. The memory is accumulated in one instance of ‘java.util.HashMap$Node[]’, loaded by ‘’, which occupies 238,749,768 (90.51%) bytes.

With visual VM it seems that

Can anyone explain which should be the root cause ?

Upvotes: 0

Views: 3516

Answers (2)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

The error is from suppress() that use the in-memory store (InMemoryTimeOrderedKeyValueBuffer). suppress() does not support RocksDB atm (cf https://issues.apache.org/jira/browse/KAFKA-7224).

Your suppress() config seems to be incorrect:

Suppressed.BufferConfig.unbounded().withMaxBytes(10000).withLoggingDisabled()

The configs unbounded() and withMaxBytes() contradict each other: do you want an unbounded or bounded buffer? -- In your case, the second withMaxBytes() overwrites the first one. Thus, you only provide 10,000 bytes for the suppress buffer. Because you use untilWindowCloses(), Kafka Streams will need to shut down if it runs out of memory, because it's neither allows to early emit (untilWindowClose()) not allowed to use more memory (withMaxBytes(...)).

For untilWindowClose() you should use unbounded(). If you want to bound memory, you should not use untilWindowClose().

Upvotes: 4

Valath
Valath

Reputation: 920

You need to tune the rocks DB configuration, please read this https://medium.com/@grinfeld_433/kafka-streams-and-rocksdb-in-the-space-time-continuum-and-a-little-bit-of-configuration-40edb5ee9ed7

If you are using java>=8, set metaspace otherwise it will eat all your server RAM. http://karunsubramanian.com/websphere/one-important-change-in-memory-management-in-java-8/

If using dockers, limit the max memory configs.

There is a bug in old kafka and they recommend to update version. https://issues.apache.org/jira/browse/KAFKA-8637

Upvotes: 0

Related Questions