WHou
WHou

Reputation: 1

Flink job ran successfully on yarn but out of memory on Kubernetes

We have a Flink job which read data from hive and join with streaming data from kafka.

It can run successfully on Yarn ,but when we run it on Kubernetes with exactly same memory setting, it failed with error

java.io.IOException: Insufficient number of network buffers: required 2, but only 1 available. The total number of network buffers is currently set to 57343 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
\tat org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:340)
\tat org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:322)
\tat org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:215)
\tat org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:139)
\tat org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator.setup(ConsumableNotifyingResultPartitionWriterDecorator.java:88)
\tat org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635)
\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:543)
\tat java.lang.Thread.run(Thread.java:748)

I followed the instruction and increased taskmanager.memory.network.fraction, then it failed because of OOM:

Caused by: java.lang.OutOfMemoryError: Java heap space
\tat java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
\tat java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
\tat di.flink.shadow.org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
\tat di.flink.shadow.org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
\tat di.flink.shadow.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
\tat org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:226)
\tat org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
\tat org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
\tat org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:719)
\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:543)
\tat java.lang.Thread.run(Thread.java:748)

I even increased the task manager process size from 16gb to 32gb on Kubernetes, still the same error shows up, by looking into Kubernetes pod resource usage metric, there are 3-5 pods which consume much more memory than average, and their memory usage kept growing during runtime.

I wonder if there's any known issue for memory usage on Kubernetes, especially for network buffer, and where can I check for such metrics to debug?

Upvotes: 0

Views: 429

Answers (1)

WHou
WHou

Reputation: 1

I found the issue, in the docker entrypoint script, task manager will do

TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

to overwrite taskmanager.numberOfTaskSlots in flink-conf, however in /proc/cpuinfo contains all physical cpu cores but not only the cores assigned to the container, so in my case, the taskmanager.numberOfTaskSlots has been set to 32 and cause a few containers need to do most of the work while the rest are free.

Upvotes: 0

Related Questions