Yong Hyun Kwon
Yong Hyun Kwon

Reputation: 379

Pyspark Checkpointing in Dataproc(StackOverFlowError)

I faced stackoveroverflow error when persisting dataset with pyspark. I am casting whole dataframe into doubletype and then persisting to calculate statistics, and I read that checkpointing is a solution to stackoverflow. However, I am having trouble with implementing it in dataproc.

  1. I am working with pyspark, and when I checkpoint the dataframe and checkedpointed with df.isCheckpointed(), it returns false. However, when I debug it, df.rdd.is_checkpointed says True. Is there any issue with the package / am I doing something wrong?

  2. I thought localCheckpoint is more appropriate for my purpose(https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint()), as my problem was simply DAG depth was too deep, but I couldn't find any use case. Also, if I just checkpointed RDD says it is checkpointed(as in first question), but if I tried localcheckpoint, it says it is not. Has anyone tried this function?

  3. After I tried with local standalone mode, I tried it with dataproc. I tried both hdfs and google cloud storage, but either way the storage was empty, but rdd says it is checkpointed.

Can anyone help me with this? Thanks in advance!

Upvotes: 0

Views: 1303

Answers (2)

Jatin Chauhan
Jatin Chauhan

Reputation: 325

Create a temp dir using your SparkSession object:

spark.sparkContext.setCheckpointDir("/tmp/checkpoints")


dataframe_name = # Any Spark Dataframe of type <pyspark.sql.dataframe.DataFrame>

at this point of time, the dataframe_name would be a DAG, which you can store as a checkpoint like,

dataframe_checkpoint = dataframe_name.checkpoint()

dataframe_checkpoint is also a spark dataframe of type <pyspark.sql.dataframe.DataFrame> but instead of the DAG, it stores the result of the query

Use checkpoints if:

  1. the computation takes a long time
  2. the computing chain is too long
  3. depends too many RDDs

Upvotes: 0

Karthik Palaniappan
Karthik Palaniappan

Reputation: 1383

If you're using localCheckpoint, it will write to the local disk of executors, not to HDFS/GCS: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint--.

Also note that there's an eager (checkpoint immediately) and non-eager (checkpoint when the RDD is actually materialized) mode to checkpointing. That may affect what those methods return. The code is often the best documentation: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1615.

In general, please attach sample code (repros) to questions like this -- that way we can answer your question more directly.

Upvotes: 1

Related Questions