kopaka
kopaka

Reputation: 792

Flink excessive load with Checkpointing RocksDB

Currently, we're trying to figure out, how to work with Flink efficiently and we're still trying to make sense of everything.

We are running about 60 really light-weight jobs on a standalone cluster, which works fine on an average EC2 instance. However, once I enabled checkpointing with a local RocksDB state backend, the cluster behaves in an unexpected manner, stops jobs, tries to restart them only to discard all of them and leaving the error logs empty. After that, no trace of either jobs or jars is left in Flink. I am aware that for each job, a fraction of the total JobManager memory is reserved, and likewise, for each job a local RocksDB is instantiated on the same machine, but I assumed they would be equally lightweight and not require much memory/CPU capacity. Just adding the line env.enableCheckpointing(1000); lead to a total failure of everything as opposed to a stable cluster before.

I personally think we may have reached the limit of our standalone Flink cluster, even increasing the memory would not suffice anymore but I'd like to have confirmation on that, before we start building a distributed Flink cluster (we'd need to automate everything, that's why I'm hesitant right now). I am not sure if e.g. storing the RocksDB checkpoints in some dedicated storage unit like S3 would even tackle this problem and if the resource consumption (other than hard disk) would be affected at all.

Is moving to a distributed environment the only way to solve our problem or does this indicate some other problem, which could be fixed by proper configuration?

Edit: Maybe I should add that there is no load yet, we are not yet talking about incoming data, but about the jobs remain running. There's a mere 100 records in the FlinkSources right now, but we won't even reach the point of those being processed.

Edit2:

This part was always part of the jobs' code:

try {
    env.setStateBackend((StateBackend) new RocksDBStateBackend("file://" + "/somePathOnInstance"));
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

and we added following line of code:

env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);

The typecast to StateBackend should not be necessary, as 1.9.1 version of RocksDBStateBackend class should already be implementing StateBackend instead of AbstractStateBackend according to the documentation. However, the doc does not resemble the actual class we get from Maven, so there's that.

Upvotes: 0

Views: 633

Answers (1)

David Anderson
David Anderson

Reputation: 43707

Given that the 60 jobs you are running have rather trivial workloads, then it is reasonable to think that turning on checkpointing is having a significant impact. Basically, I suspect that having 60-180 new threads (I'm not sure which of your operators are stateful) all trying to frequently write to the filesystem is overwhelming your t3.medium instance.

Checkpointing is managed by the checkpoint coordinator (in the Flink master), which communicates with all of the jobs, initiating the checkpoints, waiting for them to complete, and managing the metadata. Most of the work involved is done by the task managers, and is done asynchronously, so in your case that's a lot of new threads, each of which is copying the data being checkpointed to your checkpoint store (which should be a distributed filesystem, like S3). If the checkpointing interval is short, e.g., one second, then this is all happening every second.

You could inspect various metrics to try to figure out where the bottleneck is -- you may be limited by memory, or CPU, or network bandwidth.

With RocksDB, incremental checkpointing is generally lighter weight than doing full checkpoints, so selecting that option can help -- though with such small amounts of state, I don't expect this to help. More importantly, for performance reasons you should avoid using EBS (or any other network-attached storage) as the local disk for RocksDB.

Upvotes: 2

Related Questions