James Yu
James Yu

Reputation: 519

Two questions on Flink externalized checkpoints

I have two questions on Flink externalized checkpoints

(Q1) I can set "state.checkpoints.dir" in flink-conf.yaml to get externalized checkpoints to work all right, but how do I achieve same thing when I run flink from IDE? I tried the GlobalConfiguration approach mentioned in (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html) but no luck. This is how I did it:

Configuration cfg =
                GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

and this is the error msg show in IDE:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
    ... 19 more

Process finished with exit code 1

(Q2) In the checkpoint's document (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html), it says "This way, you will have a checkpoint around to resume from if your job fails.", how about the cancelled jobs? will the new job carry on with the existing checkpoint or it will start with its own checkpoint?

Upvotes: 5

Views: 2225

Answers (3)

Xilang
Xilang

Reputation: 1513

First question,create a local environment with custom config:

val conf = new Configuration()
conf.setString(CoreOptions.CHECKPOINTS_DIRECTORY, "file:///user/flink/checkpoint/storing/")
StreamExecutionEnvironment.createLocalEnvironment(4, conf)

Second question, as David said:

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Upvotes: 3

David Anderson
David Anderson

Reputation: 43707

You can control whether externalized checkpoints are deleted when the job is cancelled. If you want to retain them, you can do this:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

For more info, see the docs.

To resume from an externalized checkpoint one does this (the same as resuming from a savepoint):

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

Upvotes: 4

kkrugler
kkrugler

Reputation: 9280

Re setting the checkpoint directory from Eclipse, normally I just do it when setting up the backend to use, e.g.

env.setStateBackend(new FsStateBackend(options.getCheckpointDir()));

Re cancelled jobs - the checkpoint directory gets removed. You need to do a savepoint if you want to resume from a known state after stopping (cancelling) your job.

Upvotes: 1

Related Questions