Kristian
Kristian

Reputation: 21830

RDD.checkpoint() not storing any data in checkpoint directory

I've set the checkpoint directory with the sc.setCheckpointDir method.

/checkpointDirectory/

I've then created a checkpoint of an rdd: rdd.checkpoint() and in the directory, I now see a new directory representing the new checkpoint, in the form of a random string of letters. Inside that directory there is nothing.

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/  [empty]

Then after doing a couple transformations, I run rdd.checkpoint() again, and there is still nothing in that recently created directory

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/  [empty]

Am I using checkpoint() wrong? What am I supposed to see in that directory to know its working properly?

Upvotes: 6

Views: 2003

Answers (2)

Raju Bairishetti
Raju Bairishetti

Reputation: 354

Checkpoint will be done periodically(i.e. checkpoint duration). You need to tell checkpoint duration to your spark context.

Upvotes: -1

zero323
zero323

Reputation: 330353

checkpoint, as many other operations in Spark, is a lazy. Data is actually checkpointed if and only if a given RDD is materialized. Empty directory you see is application specific checkpoint directory.

If you want checkpoint to take place you have to trigger an action which will evaluate corresponding RDD. By example (local mode):

import glob
import os
from urllib.parse import urlparse

sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")

rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint()  # No checkpoint dir here yet

[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False

# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()  
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True

You can also analyze debug strings before:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
##  |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]

and after an action:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
##  |       CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
##  |  ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]

As you can see before RDD will be computed from scratch but after count you'll get ReliableCheckpointRDD.

Upvotes: 7

Related Questions