Reputation: 495
I am new to spark and can use some guidance here. We have some basic code to read in a csv, cache it, and output it to parquet:
1. val df=sparkSession.read.options(options).schema(schema).csv(path)
2. val dfCached = df.withColumn()....orderBy(some Col).cache()
3. dfCached.write.partitionBy(partitioning).parquet(outputPath)
AFAIK, once we invoke the parquet call (an action) the cache command should be executed to save the state of the DF before the action is applied.
In the spark UI I see:
cache
call from #2 aboveparquet
call. This job has 2 stages; 1 which seems to be repeating the caching step and the second which performs the conversion to parquet. (see images below)Why do I have both a caching Job and a caching Stage? I would expect to have only one or the other but it seems like we are caching twice here.
Upvotes: 2
Views: 472
Reputation: 495
I'm not 100% sure but it seems that the following is happening:
When csv data is loaded it is split among the worker nodes. We call cache() and each node stores the data it received in memory. This is the first cache job.
When we call partitionBy(...)
data needs to be regrouped among different executors based on the args passed to the function. Since we are caching data and data has moved from one executor to another we need to re-cache the shuffled data. This is confirmed because the second caching stage shows some shuffle write data. Furthermore the caching stage shows less tasks than the initial caching job; possibly because only the shuffled data needs to be recached as opposed to the entire data frame.
The parquet stage is invoked. We can see some shuffle read data which shows the executors reading the newly shuffled data.
Upvotes: 1