Reputation: 21830
I've set the checkpoint directory with the sc.setCheckpointDir
method.
/checkpointDirectory/
I've then created a checkpoint of an rdd: rdd.checkpoint()
and in the directory, I now see a new directory representing the new checkpoint, in the form of a random string of letters. Inside that directory there is nothing.
/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty]
Then after doing a couple transformations, I run rdd.checkpoint()
again, and there is still nothing in that recently created directory
/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty]
Am I using checkpoint()
wrong? What am I supposed to see in that directory to know its working properly?
Upvotes: 6
Views: 2003
Reputation: 354
Checkpoint will be done periodically(i.e. checkpoint duration). You need to tell checkpoint duration to your spark context.
Upvotes: -1
Reputation: 330353
checkpoint
, as many other operations in Spark, is a lazy. Data is actually checkpointed if and only if a given RDD is materialized. Empty directory you see is application specific checkpoint directory.
If you want checkpoint to take place you have to trigger an action which will evaluate corresponding RDD. By example (local mode):
import glob
import os
from urllib.parse import urlparse
sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")
rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False
# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
You can also analyze debug strings before:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]
and after an action:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]
As you can see before RDD will be computed from scratch but after count
you'll get ReliableCheckpointRDD
.
Upvotes: 7