Reputation: 379
I faced stackoveroverflow error when persisting dataset with pyspark. I am casting whole dataframe into doubletype and then persisting to calculate statistics, and I read that checkpointing is a solution to stackoverflow. However, I am having trouble with implementing it in dataproc.
I am working with pyspark, and when I checkpoint the dataframe and checkedpointed with df.isCheckpointed(), it returns false. However, when I debug it, df.rdd.is_checkpointed says True. Is there any issue with the package / am I doing something wrong?
I thought localCheckpoint is more appropriate for my purpose(https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint()), as my problem was simply DAG depth was too deep, but I couldn't find any use case. Also, if I just checkpointed RDD says it is checkpointed(as in first question), but if I tried localcheckpoint, it says it is not. Has anyone tried this function?
After I tried with local standalone mode, I tried it with dataproc. I tried both hdfs and google cloud storage, but either way the storage was empty, but rdd says it is checkpointed.
Can anyone help me with this? Thanks in advance!
Upvotes: 0
Views: 1303
Reputation: 325
Create a temp dir using your SparkSession
object:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
dataframe_name = # Any Spark Dataframe of type <pyspark.sql.dataframe.DataFrame>
at this point of time, the dataframe_name
would be a DAG
, which you can store as a checkpoint like,
dataframe_checkpoint = dataframe_name.checkpoint()
dataframe_checkpoint
is also a spark dataframe of type <pyspark.sql.dataframe.DataFrame>
but instead of the DAG
, it stores the result of the query
Use checkpoints if:
Upvotes: 0
Reputation: 1383
If you're using localCheckpoint, it will write to the local disk of executors, not to HDFS/GCS: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#localCheckpoint--.
Also note that there's an eager (checkpoint immediately) and non-eager (checkpoint when the RDD is actually materialized) mode to checkpointing. That may affect what those methods return. The code is often the best documentation: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1615.
In general, please attach sample code (repros) to questions like this -- that way we can answer your question more directly.
Upvotes: 1