shankar
shankar

Reputation: 225

Are DataFrames created every time using DAG, when the DF is referenced multiple time in spark code(if we don't use persist)?

I faced this question in an interview. The sequence of operations are as below.

Read the csv file and create a DF. DF = spark.read().format("csv").csv("/path/file.csv")

Filter the data based on some condition. DF.filter(....)

perform a count operation. DF.count()

Save the Data in file/Table. DF.saveAsTable()

The question was how many transformations and actions are there and how many times the csv file is read to create a DF?

I am curious to understand the below

Once the file is read, won't the DF stay in memory till the spark job is complete? If the DF is not being used in further steps, we don't mind even if it is removed from memory. But if it is being used in further steps, won't it stay in the memory till the step where it is last used?

Does the file read and DF creation happens every time whenever each action is triggered? Why would this be required at all?

I understand that the file is read only when a action is performed.

Now when the count() action is called first, the file has been read into memory and a DF is built. so I expect it to stay in the memory. But why will saveAsTable() action read the file again and reconstruct the DF again using DAG?

If it is re-evaluating the DF(using DAG) everytime we use the DF reference in our code, is it not time consuming to reconstruct the DF every time? If we have to avoid this reconstruction process everytime and if we start persisting the DF's, will we not end up occupying all the memory and spilling some data to disk and slow down the spark application?

Can anyone explain whats going on?

Upvotes: 0

Views: 403

Answers (1)

spartan
spartan

Reputation: 88

All Operations are clubbed into multiple "stages" in Spark DAG. And all stages are separated by "shuffle" operations.

Operations in a stage are clubbed together as they work on the data which is co-located in a single partition.

So, whenever a "action" (count() and saveAsTable() in this case) is performed it triggers the re-computation of data till the previous DF .That is, unless the previous DF is cached.

val df = spark.read().format("csv").csv("/path/file.csv")

df = df.filter(....)  // you are over writing the df reference

// df.cache() // use this if filter and read is to be performed only once.

val cnt = df.count()  // action - triggers the filter and read operations 

df.saveAsTable()   //  action - triggers the filter and read operations 

Regarding your questions:

If it is re-evaluating the DF(using DAG) everytime we use the DF reference in our code, is it not time consuming to reconstruct the DF every time?

  • Yes its wasteful and thats why its advisable to cache the DF if its used more than once.

If we have to avoid this reconstruction process everytime and if we start persisting the DF's, will we not end up occupying all the memory and spilling some data to disk and slow down the spark application?

  • I think you answered already, yes memory is a limited resource. That's the reason all DFs are not persisted by default. However, when you persist there are multiple options like MEMORY_ONLY, MEMORY_AND_DISK besides others. Check details here

Upvotes: 1

Related Questions