Reputation: 225
I faced this question in an interview. The sequence of operations are as below.
Read the csv file and create a DF. DF = spark.read().format("csv").csv("/path/file.csv")
Filter the data based on some condition. DF.filter(....)
perform a count operation. DF.count()
Save the Data in file/Table. DF.saveAsTable()
The question was how many transformations and actions are there and how many times the csv file is read to create a DF?
I am curious to understand the below
Once the file is read, won't the DF stay in memory till the spark job is complete? If the DF is not being used in further steps, we don't mind even if it is removed from memory. But if it is being used in further steps, won't it stay in the memory till the step where it is last used?
Does the file read and DF creation happens every time whenever each action is triggered? Why would this be required at all?
I understand that the file is read only when a action is performed.
Now when the count() action is called first, the file has been read into memory and a DF is built. so I expect it to stay in the memory. But why will saveAsTable() action read the file again and reconstruct the DF again using DAG?
If it is re-evaluating the DF(using DAG) everytime we use the DF reference in our code, is it not time consuming to reconstruct the DF every time? If we have to avoid this reconstruction process everytime and if we start persisting the DF's, will we not end up occupying all the memory and spilling some data to disk and slow down the spark application?
Can anyone explain whats going on?
Upvotes: 0
Views: 403
Reputation: 88
All Operations are clubbed into multiple "stages" in Spark DAG. And all stages are separated by "shuffle" operations.
Operations in a stage are clubbed together as they work on the data which is co-located in a single partition.
So, whenever a "action" (count() and saveAsTable() in this case) is performed it triggers the re-computation of data till the previous DF .That is, unless the previous DF is cached.
val df = spark.read().format("csv").csv("/path/file.csv")
df = df.filter(....) // you are over writing the df reference
// df.cache() // use this if filter and read is to be performed only once.
val cnt = df.count() // action - triggers the filter and read operations
df.saveAsTable() // action - triggers the filter and read operations
Regarding your questions:
If it is re-evaluating the DF(using DAG) everytime we use the DF reference in our code, is it not time consuming to reconstruct the DF every time?
If we have to avoid this reconstruction process everytime and if we start persisting the DF's, will we not end up occupying all the memory and spilling some data to disk and slow down the spark application?
Upvotes: 1