Reputation: 9961
I have running Standalone HA Flink cluster which make checkpoints of my flow every minute but I don't see them in state.backend.fs.checkpointdir
directory.
flink-conf.yaml
jobmanager.heap.mb: 1024
jobmanager.web.port: 8081
taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager
blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage
parallelism.default: 4
state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s
recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example_staging/flink
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
recovery.jobmanager.port: 6123
fs.hdfs.hadoopconf: /flink/conf
As you can see the checkpoints should be saved to s3a://example-staging-flink/checkpoints
directory but I don't see them:
~ s3cmd ls s3://example-staging-flink/
DIR s3://example-staging-flink/recovery/
~ s3cmd ls s3://example-staging-flink/recovery/
DIR s3://example-staging-flink/recovery/blob/
2016-04-15 10:33 1137280 s3://example-staging-flink/recovery/completedCheckpoint6eab84c79b02
2016-04-15 01:23 506961 s3://example-staging-flink/recovery/completedCheckpoint9e8f3d1254aa
2016-04-15 09:39 149987 s3://example-staging-flink/recovery/submittedJobGraph0bf82ada1dc6
~ s3cmd ls s3://example-staging-flink/recovery/blob/
DIR s3://example-staging-flink/recovery/blob/cache/
~ s3cmd ls s3://example-staging-flink/recovery/blob/cache/
2016-04-14 13:00 3023995 s3://example-staging-flink/recovery/blob/cache/blob_0b6e57360c05128b3c91d75341785df64b91217b
2016-04-15 09:39 3066784 s3://example-staging-flink/recovery/blob/cache/blob_3ef7422ce7b5e5cbf1f031b0de1561159109d7f9
2016-04-14 12:54 3023898 s3://example-staging-flink/recovery/blob/cache/blob_5062028a8cab14daaeb19e51f01a02da3a8e515a
2016-04-14 12:29 3025864 s3://example-staging-flink/recovery/blob/cache/blob_7809e559953291cab482e9cf3324457ad07d6d05
JobManager log has the following logs:
2016-04-21 12:34:55,684 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory - Using job manager savepoint state backend.
2016-04-25 01:13:14,569 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized in '/checkpoints/a5f89242c729190e46baf409768960fb'.
2016-04-25 01:13:14,581 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator - Create CheckpointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator - Create SavepointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2016-04-25 01:13:14,594 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper.
2016-04-25 01:13:14,875 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized with Checkpoint 1015 @ 1461546663803 for a5f89242c729190e46baf409768960fb. Removing all older checkpoints.
2016-04-25 01:18:15,247 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1016 @ 1461547095238
2016-04-25 01:18:18,955 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1016 (in 153 ms)
2016-04-25 01:23:15,242 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1017 @ 1461547395238
2016-04-25 01:23:17,357 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1017 (in 138 ms)
2016-04-25 01:28:15,244 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1018 @ 1461547695239
2016-04-25 01:28:18,300 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1018 (in 101 ms)
So, can anybody explain me why Standalone HA cluster of Apache Flink doesn't save checkpoints to storage?
Upvotes: 3
Views: 1784
Reputation: 9961
I have found the following log message in task manager log:
2016-05-06 10:08:30,591 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined state backend: MemoryStateBackend (data in heap memory / checkpoints to JobManager)
it was created here.
So, I just forget to remove the following line from my code:
env.setStateBackend(new MemoryStateBackend());
When I remove it and re-deploy my flow the Flink start write checkpoints to file system directory specified in state.backend.fs.checkpointdir
parameter.
Upvotes: 1
Reputation: 615
Flink doesn't store the actual state to files if it is smaller than a given threshold. The default threshold (adjustable via state.backend.fs.memory-threshold
) is 1024 bytes. Below this threshold, the state is stored alongside with the checkpoint metadata.
The idea behind this threshold is that writing small state to disk is relatively expensive when using distributed file systems. The metadata needs to be written anyway and just stores a little more data.
Setting state.backend.fs.memory-threshold: 0
should always write the state to your checkpoint directory regardless of its size.
Upvotes: 2