Reputation: 79
Currently I using FsStateBackend for checkpointing state. I'm using interval 10s like code below. But I see the cost of transfer bucket that using checkpoint is approximately 20$/ day and aws transfer s3 pricing: $0.005/1000 requests => (I'm using ~4000000 requests/day @@). I have 7 jobs, which:
And run flink on AWS EMR. Average state size for each checkpoint from (8KB -> 30M). What happened behind checkpoint?
// set up checkpoint
env.enableCheckpointing(1000 or 10000);
// advanced options:
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
// env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// folder to checkpoint
StateBackend backend = new FsStateBackend(checkpointPath, true);
env.setStateBackend(backend);
Upvotes: 0
Views: 895
Reputation: 43419
Which implementation of S3 are you using for checkpointing? It makes a big difference.
While you must use the hadoop implementation of S3 with the StreamingFileSink
, it can be a poor choice for checkpointing. The Hadoop S3 FS tries to imitate a filesystem on top of S3:
As a result the Hadoop S3 FS has very high "create file" latency and it quickly runs into request rate limits (HEAD requests have very low request rate limits on S3).
Presto S3 doesn't try to do that magic; it simply does PUT/GET operations without all the other stuff around it. Because Flink's checkpointing assumes nothing more than that, it's more efficient and consistent.
Furthermore, with Hadoop S3 you may come to a situation where you fail restore operations because it looks like a state file is not there (HEAD request leading to false caching in an S3 load balancer). Only after a while will the file be visible and only then will the restore succeed.
Note, however, that there are also problems with using Presto S3 for checkpointing. See FLINK-24392. Neither implementation is ideal.
It is possible, by the way, to use both the hadoop version for the sink, and the presto version for checkpointing. In this case you should explicitly use s3a:// as the scheme for the sink (Hadoop), and s3p:// for checkpointing (Presto).
Upvotes: 4