Reputation: 91
Since the last couple week I build a DataStream programs in Flink in scala. But I have a strange behavior, flink uses lots of more memory than I expected.
I have a 4 ListState of tuple(Int, long) in my processFunction keyed by INT, I use it to get different unique Counter in a different time frame, and I expected the most of the memory was used by this List.
But it's not the case. So I print an histo live of the JVM. And I was surprised how many memories are used.
num #instances #bytes class name
----------------------------------------------
1: 138920685 6668192880 java.util.HashMap$Node
2: 138893041 5555721640 org.apache.flink.streaming.api.operators.InternalTimer
3: 149680624 3592334976 java.lang.Integer
4: 48313229 3092046656 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
5: 14042723 2579684280 [Ljava.lang.Object;
6: 4492 2047983264 [Ljava.util.HashMap$Node;
7: 41686732 1333975424 com.myJob.flink.tupleState
8: 201 784339688 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
9: 17230300 689212000 com.myJob.flink.uniqStruct
10: 14025040 561001600 java.util.ArrayList
11: 8615581 413547888 com.myJob.flink.Data$FingerprintCnt
12: 6142006 393088384 com.myJob.flink.ProcessCountStruct
13: 4307549 172301960 com.myJob.flink.uniqresult
14: 4307841 137850912 com.myJob.flink.Data$FingerprintUniq
15: 2153904 137849856 com.myJob.flink.Data$StreamData
16: 1984742 79389680 scala.collection.mutable.ListBuffer
17: 1909472 61103104 scala.collection.immutable.$colon$colon
18: 22200 21844392 [B
19: 282624 9043968 org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
20: 59045 6552856 [C
21: 33194 2655520 java.nio.DirectByteBuffer
22: 32804 2361888 sun.misc.Cleaner
23: 35 2294600 [Lscala.concurrent.forkjoin.ForkJoinTask;
24: 640 2276352 [Lorg.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
25: 32768 2097152 org.apache.flink.core.memory.HybridMemorySegment
26: 12291 2082448 java.lang.Class
27: 58591 1874912 java.lang.String
28: 8581 1372960 java.lang.reflect.Method
29: 32790 1311600 java.nio.DirectByteBuffer$Deallocator
30: 18537 889776 java.util.concurrent.ConcurrentHashMap$Node
31: 4239 508680 java.lang.reflect.Field
32: 8810 493360 java.nio.HeapByteBuffer
33: 7389 472896 java.util.HashMap
34: 5208 400336 [I
The tupple(Int, long) is com.myJob.flink.tupleState in 7th position. And I see the tuple use less than 2G of memory.
I don't understand why flink used this amount of memory for these classes.
Can anyone give me a light on this behavior, thanks in advance.
Update:
I run my job on a stand alone cluster (1 jobManager, 3 taskManager)
the flink version is 1.5-SNAPSHOT commit : e4486ae
I get the histo live on one taskManager node.
Update 2 :
In my processFunction I used :
ctx.timerService.registerProcessingTimeTimer(ctx.timestamp + 100)
And after on onTimer
function, I process my listState
to check all old data.
so it create a timer for each call on processFunction.
but why the timer is steel on memory after onTimer
function triggered
Upvotes: 1
Views: 1886
Reputation: 3229
How many windows do you end up with? Based on the top two entries what are are seeing is the "timers" that are used by Flink to track when to clean up the window. For every key in the window you will end up with (key, endTimestamp) effectively in the timer state. If you have a very large number of windows (perhaps out of order time or delayed watermarking) or a very large number of keys in each window, those will each take up memory.
Note that even if you are using RocksDB state, the TimerService uses Heap memory so you have to watch out for that.
Upvotes: 0