Reputation: 1
I have a Kafka Streams app(Kafka Streams 2.1 + Kafka broker 2.0) which does a aggregation based on TimeWindows, and I use the suppress operator to supress the result's output.
Everything works well until I restart my app, it will reset the offset of KTABLE-SUPPRESS-STATE-STORE to 0 to restore the suppression state, as expected. But each time I restart it, it will throw an OutOfMemoryError
, I thought maybe the heap size is not enough, so I use a larger Xmx/Xms
, it works one or two restart, and then the OutOfMemoryError
comes back again. Now the Xmx
is about 20G now, I think something is not right here.
The code snippet:
TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);
KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
.windowedBy(windows)
.aggregate(MyStatistics::new,
(sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
.suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));
And I find that the key of record in KTABLE-SUPPRESS-STATE-STORE is something like 1234567j�P, which is not readable, but I guess it's generated by combine the SN and window, I think this will make KTABLE-SUPPRESS-STATE-STORE redundent, because each SN will have multi records for each window.
I have two questions:
OutOfMemoryError
indicates a small heap size or not, if so, how to limit the rate, if not, what does it means?Thanks!
Edit in 2019/4/16
The error stacktrace is:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Upvotes: 0
Views: 2466
Reputation: 726
If the OutOfMemoryError indicates a small heap size or not, if so, how to limit the rate, if not, what does it means?
Yes, there's not enough heap to allocate all the memory that the application needs to operate. We don't see this very often, and the suppression operator is new, so I'm suspicious of it, but it's good to bear in mind that basically any data structure in your application could be responsible.
The best way to diagnose memory pressure is to do a "heap dump". This basically copies your JVM's entire memory into a file, so that you can analyse its contents using a program like https://www.eclipse.org/mat/ . It'll be a little bit of a learning curve, but I think you'll find that some facility with analyzing memory usage is very handy in general.
You can trigger a heap dump at any time (there are several ways to do it, you'll have to research the best way for you). But I think you'll want to make use of Java's nifty option to do a heap dump when it gets an out-of-memory error. This way, you're more likely to positively identify the culprit. See https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr , or similar for your JVM.
I can speculate about the cause of the heap dump, but I'm afraid I might just lead you astray and waste your time. Once you have the results of the dump, I think you should go ahead and open a bug report in the Kafka issue tracker: https://issues.apache.org/jira/projects/KAFKA . Then, we can help figure out both how to work around the bug to get you running again, and also how to fix it in future releases.
Actually, I will offer one speculation... It's possible you're seeing a result of this bug: https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895) . If your OOME goes away when you remove the suppression operator, you might want to leave it out for now. As soon as we merge the fix, I'll request a bugfix release, and you can try again to see if the problem is resolved.
The key for KTABLE-SUPPRESS-STATE-STORE is defined by which API, how or should can I control it?
Fortunately, this has a more straightforward answer. The key you're looking at is a binary-packed version of your record key and the timestamp of the window. This key is a result of your usage of windowBy
. In Java, you can see that the result of the aggregation is a KTable<Windowed<String>, ...>
and that Suppress doesn't change the key or value type. In other words, you're looking at a serialized version of the key (Windowed<String>
).
Leaving suppression aside for a second; Let's say you have two serial numbers, "asdf" and "zxcv". Let's say your window size is one hour. Your application is grouping events for each of those serial numbers (independently) in each hour of the day. So there's an aggregation for all the "asdf" records from 9:00 to 10:00, and there's also one for all the "zxcv" records from 9:00 to 10:00. Thus, the total number of keys in the windowed KTable is key space
x number of windows being retained
.
The Suppression operator will have no effect on the number of keys in the KTable. Its purpose is to suppress updates to those keys for a specified amount of time (timeToWait
). For example, without suppress, if you get 3 updates to the "asdf" record between 9:00 and 10:00, the windowed aggregation will emit an updated result for (asdf, 9:00)
each time, so for 3 events in, you see 3 result updates coming out. The Suppress operator just prevents those result updates until timeToWait
has passed, and when it does pass, it emits only the most recent update.
So, the number of keys in the suppression buffer at any time is smaller than the total number of keys in the upstream KTable. It just contains the keys that have been updated in the last timeToWait
amount of time.
Does that help?
Upvotes: 3