Reputation: 480
I have a job running on Flink 1.14.3 (Java 11) that uses rocksdb as the state backend. The problem is that the job requires an amount of memory pretty similar to the overall state size.
Indeed, for making it stable (and capable of taking snapshots) this is what I'm using:
I have these settings in place:
state.backend: rocksdb
state.backend.incremental: 'true'
state.backend.rocksdb.localdir: /opt/flink/rocksdb <-- SSD volume (see below)
state.backend.rocksdb.memory.managed: 'true'
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
taskmanager.memory.managed.fraction: '0.9'
taskmanager.memory.framework.off-heap.size: 512mb
taskmanager.numberOfTaskSlots: '4' (parallelism: 16)
Also this:
- name: rocksdb-volume
volume:
emptyDir:
sizeLimit: 100Gi
name: rocksdb-volume
volumeMount:
mountPath: /opt/flink/rocksdb
Which provides plenty of disk space for each task manager. With those settings, the job runs smoothly and in particular there is a relatively big memory margin. Problem is that memory consumption slowly increases and also that with less memory margin snapshots fail. I have tried reducing the number of taskmanagers but I need 4. Same with the amount of RAM, I have tried giving e.g. 16 GB instead of 30 GB but same problem. Another setting that has worked for us is using 8 TMs each with 16 GB of RAM, but again, this leads to the same amount of memory overall as the current settings. Even with that amount of memory, I can see that memory keeps growing and will probably lead to a bad end...
Also, the latest snapshot took around 120 GBs, so as you can see I am using an amount of RAM similar to the size of the total state, which defeats the whole purpose of using a disk-based state backend (rocksdb) plus local SSDs.
Is there an effective way of limiting the memory that rocksdb takes (to that available on the running pods)? Nothing I have found/tried out so far has worked. Theoretically, the images I am using have jemalloc
in place for memory allocation, which should avoid memory fragmentation issues observed with malloc in the past.
UPDATE 1: Attached please find memory evolution with `taskmanager.memory.managed.fraction' equal to 0.25 and 0.1. Apparently, the job continues to require all the available memory in the long run.
UPDATE 2: Following David's suggestion I've tried lowering the value of taskmanager.memory.managed.fraction
as well as the total amount of memory. In particular, it seems that the job can run smoothly with 8 GB per TM if the managed fraction is set to 0.2. However, if set to 0.9 the job fails to start (due to lack of memory) unless 30 GB per TM are given. The following screenshot displays the memory evolution with the managed fraction set to 0.2 and the TM memory set to 8 GB. At around 13.50h a significant amount of memory was freed as a result of taking a snapshot (which worked well). Overall the job looks pretty stable now...
Upvotes: 2
Views: 3635
Reputation: 52
Just came across this post -- thought I'd share an article from Shopify Engineering blog (no I don't work there).
The summary is: consider disabling RocksDB block cache via custom RocksDBOptionsFactory.
https://shopify.engineering/optimizing-apache-flink-applications-tips
You can take a look at 7. Understand RocksDB Memory Usage, which I shall attempt to copy-paste below.
We also observed another memory-related issue that was very complex to debug that happened whenever we:
- started one of the Flink applications with a lot of state
- waited for at least an hour
- manually terminated one of the Task Manager containers.
We expected a replacement Task Manager to be added to the cluster (thanks to the Kubernetes Deployment) and the application recovery to happen shortly after that. But instead, we noticed another one of our Task Managers crashing with an “Out of Memory” error, leading to a never-ending loop of crashes and restarts:
We confirmed that the issue only happens for an application with a lot of state that’s been running for at least an hour. We were surprised to realize that the “Out of Memory” errors didn’t come from the JVM—heap profiling using async-profiler and VisualVM didn’t show any problems. But still, something led to increased memory usage and eventually forced Kubernetes runtime to kill a pod violating its memory limits.
So we started to suspect that the RocksDB state backend was using a lot of native memory outside of JVM. We configured jemalloc to periodically write heap dumps to a local filesystem so we could analyze them with jeprof. We were able to capture a heap dump just before another “Out of Memory” error and confirmed RocksDB trying to allocate more memory than it was configured to use:
Total: 6743323999 B
5110634379 75.8% 75.8% 5110634379 75.8% rocksdb::UncompressBlockContentsForCompressionType
1438188896 21.3% 97.1% 1438188896 21.3% os::malloc@bf9490
134350858 2.0% 99.1% 134350858 2.0% rocksdb::Arena::AllocateNewBlock
22944600 0.3% 99.4% 27545943 0.4% rocksdb::LRUCacheShard::Insert
22551264 0.3% 99.8% 5133185644 76.1% rocksdb::BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator
4732478 0.1% 99.9% 4732478 0.1% rocksdb::LRUHandleTable::Resize
2845444 0.0% 99.9% 2845444 0.0% std::string::_Rep::_S_create
1333241 0.0% 99.9% 1333241 0.0% inflate
1310779 0.0% 99.9% 1310779 0.0% rocksdb::WritableFileWriter::Append
1191205 0.0% 100.0% 2633942 0.0% _init
In this particular example, Flink Managed Memory was configured to use 5.90 GB, but the profile clearly shows 6.74 GB being used.
We found a relevant RocksDB issue that supports this claim: many users of the library reported various memory-related issues over the last three years. We followed one of the suggestions in the issue and tried to disable RocksDB block cache using a custom RocksDBOptionsFactory:
import org.apache.flink.contrib.streaming.state.{ConfigurableRocksDBOptionsFactory, RocksDBOptionsFactory}
import org.rocksdb.DBOptions
import org.rocksdb.ColumnFamilyOptions
import org.rocksdb.BlockBasedTableConfig
import org.apache.flink.configuration.ReadableConfig
import java.util
class NoBlockCacheRocksDbOptionsFactory extends ConfigurableRocksDBOptionsFactory {
override def createDBOptions(currentOptions: DBOptions, handlesToClose: util.Collection[AutoCloseable]): DBOptions = {
currentOptions
}
override def createColumnOptions(
currentOptions: ColumnFamilyOptions,
handlesToClose: util.Collection[AutoCloseable]
): ColumnFamilyOptions = {
val blockBasedTableConfig = new BlockBasedTableConfig()
blockBasedTableConfig.setNoBlockCache(true)
// Needed in order to disable block cache
blockBasedTableConfig.setCacheIndexAndFilterBlocks(false)
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(false)
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(false)
currentOptions.setTableFormatConfig(blockBasedTableConfig)
currentOptions
}
override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
this
}
}
It worked! Now, even after killing a Task Manager we didn’t observe any memory issues:
Disabling RocksDB block cache didn’t affect performance. In fact, we only saw a difference during the time it takes to populate the cache. But there was no difference in performance between a Flink application with disabled RocksDB block cache and a Flink application with full RocksDB block cache. This also explains why we needed to wait in order to reproduce the issue: we were waiting for the block cache to fill. We later confirmed it by enabling the RocksDB Native Metrics.
Upvotes: 2
Reputation: 43697
RocksDB is designed to use all of the memory you give it access to -- so if it can fit all of your state in memory, it will. And given that you've increased taskmanager.memory.managed.fraction
from 0.4 to 0.9, it's not surprising that your overall memory usage approaches its limit over time.
If you give RocksDB rather less memory, it should cope. Have you tried that?
Upvotes: 2