Maxim
Maxim

Reputation: 9961

Why Standalone HA Flink cluster doesn't save checkpoints to `state.backend.fs.checkpointdir` directory?

I have running Standalone HA Flink cluster which make checkpoints of my flow every minute but I don't see them in state.backend.fs.checkpointdir directory.

flink-conf.yaml

jobmanager.heap.mb: 1024
jobmanager.web.port: 8081

taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager

blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage

parallelism.default: 4

state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints

restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s

recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example_staging/flink
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
recovery.jobmanager.port: 6123

fs.hdfs.hadoopconf: /flink/conf

As you can see the checkpoints should be saved to s3a://example-staging-flink/checkpoints directory but I don't see them:

~ s3cmd ls s3://example-staging-flink/
                       DIR   s3://example-staging-flink/recovery/
~ s3cmd ls s3://example-staging-flink/recovery/
                       DIR   s3://example-staging-flink/recovery/blob/
2016-04-15 10:33   1137280   s3://example-staging-flink/recovery/completedCheckpoint6eab84c79b02
2016-04-15 01:23    506961   s3://example-staging-flink/recovery/completedCheckpoint9e8f3d1254aa
2016-04-15 09:39    149987   s3://example-staging-flink/recovery/submittedJobGraph0bf82ada1dc6
~ s3cmd ls s3://example-staging-flink/recovery/blob/
                       DIR   s3://example-staging-flink/recovery/blob/cache/
~ s3cmd ls s3://example-staging-flink/recovery/blob/cache/
2016-04-14 13:00   3023995   s3://example-staging-flink/recovery/blob/cache/blob_0b6e57360c05128b3c91d75341785df64b91217b
2016-04-15 09:39   3066784   s3://example-staging-flink/recovery/blob/cache/blob_3ef7422ce7b5e5cbf1f031b0de1561159109d7f9
2016-04-14 12:54   3023898   s3://example-staging-flink/recovery/blob/cache/blob_5062028a8cab14daaeb19e51f01a02da3a8e515a
2016-04-14 12:29   3025864   s3://example-staging-flink/recovery/blob/cache/blob_7809e559953291cab482e9cf3324457ad07d6d05

JobManager log has the following logs:

2016-04-21 12:34:55,684 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job manager savepoint state backend.
2016-04-25 01:13:14,569 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Initialized in '/checkpoints/a5f89242c729190e46baf409768960fb'.
2016-04-25 01:13:14,581 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator  - Create CheckpointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO  org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator  - Create SavepointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
2016-04-25 01:13:14,594 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
2016-04-25 01:13:14,875 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Initialized with Checkpoint 1015 @ 1461546663803 for a5f89242c729190e46baf409768960fb. Removing all older checkpoints.
2016-04-25 01:18:15,247 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1016 @ 1461547095238
2016-04-25 01:18:18,955 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1016 (in 153 ms)
2016-04-25 01:23:15,242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1017 @ 1461547395238
2016-04-25 01:23:17,357 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1017 (in 138 ms)
2016-04-25 01:28:15,244 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1018 @ 1461547695239
2016-04-25 01:28:18,300 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1018 (in 101 ms)

So, can anybody explain me why Standalone HA cluster of Apache Flink doesn't save checkpoints to storage?

Upvotes: 3

Views: 1784

Answers (2)

Maxim
Maxim

Reputation: 9961

I have found the following log message in task manager log:

2016-05-06 10:08:30,591 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using user-defined state backend: MemoryStateBackend (data in heap memory / checkpoints to JobManager)

it was created here.

So, I just forget to remove the following line from my code:

env.setStateBackend(new MemoryStateBackend());

When I remove it and re-deploy my flow the Flink start write checkpoints to file system directory specified in state.backend.fs.checkpointdir parameter.

Upvotes: 1

mxm
mxm

Reputation: 615

Flink doesn't store the actual state to files if it is smaller than a given threshold. The default threshold (adjustable via state.backend.fs.memory-threshold) is 1024 bytes. Below this threshold, the state is stored alongside with the checkpoint metadata.

The idea behind this threshold is that writing small state to disk is relatively expensive when using distributed file systems. The metadata needs to be written anyway and just stores a little more data.

Setting state.backend.fs.memory-threshold: 0 should always write the state to your checkpoint directory regardless of its size.

Upvotes: 2

Related Questions