Reputation: 56
I have cached the dataframe (dfUpdates) results; but after a MERGE on the Table-A, when I want to use the previously saved results in dfUpdates, the results are different.
Below is a step by step explanation of what is happening..
Step-1:
df_Stage.count // 2113983
val df_Prodtable = spark.table("ProdTable") // Delta Table.
df_Prodtable.count //2112563
val dfUpdates = df_stage.join(df_Prodtable)
.filter( Multiple conditions here...)
.cache()
println(dfUpdates.count) //This action should cache the dfUpdates results into Memory. count is 130454, i.e., these many records need to be updated into the Prodtable.
Step-2:
MERGE statement here updates the records into the Prodtable (Delta table) using records from the dfUpdates dataframe. All the 130454 records are getting updated into the Prodtable.
Step-3:
println(dfUpdates.count) // gets zero records.
I understand that step-3 is triggering an action on the dfUpdates dataframe and it gets recomputed (join condition gets recomputed). How do I store the materialized data into Memory ? so that I can use the dfUpdates for downstream processing.
Upvotes: 2
Views: 311
Reputation: 6984
In your case, dfUpdates
depend on ProdTable
so when you update the ProdTable
during the merge process, spark invalidates the dependent cache of dfUpdates making the count incorrect.
Here is the JIRA that can help.
When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.
To bypass this situation, you should save the intermediate result in a disk and read back. This is a common strategy to break the DAG and to save recomputation.
Upvotes: 2