Reputation: 25852
There is the following trick how to trim Apache Spark dataframe lineage, especially for iterative computations:
def getCachedDataFrame(df: DataFrame): DataFrame = {
val rdd = df.rdd.cache()
df.sqlContext.createDataFrame(rdd, df.schema)
}
It looks like some sort of pure magic, but right now I'm wondering why do we need to invoke cache()
method on RDD? What is the purpose of having cache in this lineage trimming logic?
Upvotes: 3
Views: 645
Reputation: 5991
To understand the purpose of caching, it helps to understand the different types of RDD operations: transformations and actions. From the docs:
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
Also consider this bit:
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
So Spark's transformations (like map
for example) are all lazy, because this helps Spark in being smart about which calculations need to be done while creating a query plan.
Consider the following code:
// Reading in some data
val df = spark.read.parquet("some_big_file.parquet")
// Applying some transformations on the data (lazy operations here)
val cleansedDF = df
.filter(filteringFunction)
.map(cleansingFunction)
// Executing an action, triggering the transformations to be calculated
cleansedDF.write.parquet("output_file.parquet")
// Executing another action, triggering the transformations to be calculated
// again
println(s"You have ${cleansedDF.count} rows in the cleansed data")
In here, we reading in some file, applying some transformations and applying 2 actions to the same dataframe: cleansedDF.write.parquet
and cleansedDF.count
.
As the comments in the code explain, if we run the code like this we will be actually computing those transformations twice. Since the transformations are lazy, they will only get executed if an action requires them to be executed.
How can we prevent this double calculation? With caching: we can tell Spark to keep "save" result of some transformations so that they don't have to be calculated multiple times. This can be either on disk/memory/....
So with this knowledge, our code could be something like this:
// Reading in some data
val df = spark.read.parquet("some_big_file.parquet")
// Applying some transformations on the data (lazy operations here) AND caching the result of this calculation
val cleansedDF = df
.filter(filteringFunction)
.map(cleansingFunction)
.cache
// Executing an action, triggering the transformations to be calculated AND the results to be cached
cleansedDF.write.parquet("output_file.parquet")
// Executing another action, reusing the cached data
println(s"You have ${cleansedDF.count} rows in the cleansed data")
I've adjusted the comments in this code block to highlight the difference with the previous block.
Note that .persist
also exists. With .cache
you use the default storage level, with .persist
you can specify which storage level as this SO answer nicely explains.
Hope this helps!
Upvotes: 0