Reputation: 897
My question is little different from other question I could find on stack overflow. I need to know if the data is already retrieved and stored in a dataframe or if that is yet to happen
I am doing something like this
df1=spark.table("sourceDB.Table1")
df1.cache()
Now, as you might be aware, data is not read yet from the source table due to lazy execution. So I need to have an expression here that says the result as "False" at this point.
After sometime, I am doing some operation that requires data to be retrieved from source. For example.
df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()
At this point, data must have been read and stored in cache for df1. So I need to have an expression here that says the result as "True" at this point.
Is there anyway we can achieve this? I believe df1.is_cached will not help in this situation
Upvotes: 1
Views: 2416
Reputation: 4913
pyspark version (loosely based on answer from @som)
def is_cached(df: DataFrame) -> bool:
jspark: Any = df.sparkSession._jsparkSession
jdf: Any = df._jdf
plan = jdf.queryExecution().logical()
cache = jspark.sharedState().cacheManager().lookupCachedData(plan)
return (
cache.nonEmpty() and
cache.get().cachedRepresentation().cacheBuilder().isCachedColumnBuffersLoaded()
)
df = spark.createDataFrame([Row(id=1)])
is_cached(df) # False
df.count()
is_cached(df) # False
df.cache()
is_cached(df) # False
df.count()
is_cached(df) # True
Upvotes: 2
Reputation: 6338
Perhaps this is useful
1. If you wanted to check whether the cache/persist
is already triggered on the dataframe then you can use cachemanager
to confirm that as below-
spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty
2. If you wanted to check if the data is there in the memory, perhaps below method would be helpful-
def checkIfDataIsInMemory(df: DataFrame): Boolean = {
val manager = df.sparkSession.sharedState.cacheManager
// step 1 - check if the dataframe.cache is issued earlier or not
if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
println("Cache statement is already issued on this dataframe")
// step-2 check if the data is in memory or not
val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
} else false
}
3. Test the above method-
val df = spark.read
.parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
".parquet").getPath)
println(checkIfDataIsInMemory(df))
/**
* false
*/
df.cache()
// check if the data is cached
println(checkIfDataIsInMemory(df))
/**
* Cache statement is already issued on this dataframe
* false
*/
println(df.count())
println(checkIfDataIsInMemory(df))
/**
* 1
* Cache statement is already issued on this dataframe
* true
*/
Upvotes: 4