BiS
BiS

Reputation: 503

Spark createDataFrame(df.rdd, df.schema) vs checkPoint for breaking lineage

I'm currently using

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

In order to save time when calculating plans, however docs say that checkpointing is the suggested way to "cut" lineage. BUT I don't want to pay the price of saving the RDD to disk.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

Only think I can guess is that if some node fails after my "lineage breaking" maybe my process will fail while the checkpointed one would have worked correctly? (what If the DF is cached instead of checkpointed?)

Thanks!

EDIT:

From SMaZ answer, my own knowledge and the article which he provided. Using createDataframe (which is a Dev-API, so use at "my"/your own risk) will keep the lineage in memory (not a problem for me since I don't have memory problems and the lineage is not big).

With this, it looks (not tested 100%) that Spark should be able to rebuild whatever is needed if it fails.

As I'm not using the data in the following executions, I'll go with cache+createDataframe versus checkpointing (which If i'm not wrong, is actually cache+saveToHDFS+"createDataFrame").

My process is not that critical (if it crashes) since an user will be always expecting the result and they launch it manually, so if it gives problems, they can relaunch (+Spark will relaunch it) or call me, so I can take some risk anyways, but I'm 99% sure there's no risk :)

Upvotes: 8

Views: 2949

Answers (2)

Rituparno Behera
Rituparno Behera

Reputation: 187

I think the sparkSession.createDataFrame(df.rdd, df.schema) will impact the fault tolerance property of spark.

But the checkpoint() will save the RDD in hdfs or s3 and hence if failure occurs, it will recover from the last checkpoint data.

And in case of createDataFrame(), it just breaks the lineage graph.

Upvotes: 0

SMaZ
SMaZ

Reputation: 2655

Let me start with creating dataframe with below line :

val newDf=sparkSession.createDataFrame(df.rdd, df.schema)

If we take close look into SparkSession class then this method is annotated with @DeveloperApi. To understand what this annotation means please take a look into below lines from DeveloperApi class

A lower-level, unstable API intended for developers.

Developer API's might change or be removed in minor versions of Spark.

So it is not advised to use this method for production solutions, called as Use at your own risk implementation in open source world.

However, Let's dig deeper what happens when we call createDataframe from RDD. It is calling the internalCreateDataFrame private method and creating LogicalRDD.

LogicalRDD is created when:

  • Dataset is requested to checkpoint
  • SparkSession is requested to create a DataFrame from an RDD of internal binary rows

So it is nothing but the same as checkpoint operation without saving the dataset physically. It is just creating DataFrame From RDD Of Internal Binary Rows and Schema. This might truncate the lineage in memory but not at the Physical level.

So I believe it's just the overhead of creating another RDDs and can not be used as a replacement of checkpoint.

Now, Checkpoint is the process of truncating lineage graph and saving it to a reliable distributed/local file system.

Why checkpoint?

  • If computation takes a long time or lineage is too long or Depends too many RDDs

  • Keeping heavy lineage information comes with the cost of memory.

  • The checkpoint file will not be deleted automatically even after the Spark application terminated so we can use it for some other process

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

This article will give detail information on cache and checkpoint. IIUC, your question is more on where we should use the checkpoint. let's discuss some practical scenarios where checkpointing is helpful

  1. Let's take a scenario where we have one dataset on which we want to perform 100 iterative operations and each iteration takes the last iteration result as input(Spark MLlib use cases). Now during this iterative process lineage is going to grow over the period. Here checkpointing dataset at a regular interval(let say every 10 iterations) will assure that in case of any failure we can start the process from last failure point.
  2. Let's take some batch example. Imagine we have a batch which is creating one master dataset with heavy lineage or complex computations. Now after some regular intervals, we are getting some data which should use earlier calculated master dataset. Here if we checkpoint our master dataset then it can be reused for all subsequent processes from different sparkSession.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

That's correct, If your process is not heavy-computation/Big-lineage then there is no point of checkpointing. Thumb rule is if your dataset is not used multiple time and can be re-build faster than the time is taken and resources used for checkpoint/cache then we should avoid it. It will give more resources to your process.

Upvotes: 7

Related Questions