Mohan
Mohan

Reputation: 897

How to check if data is cached in dataframe or not yet cached due to lazy execution in Pyspark?

My question is little different from other question I could find on stack overflow. I need to know if the data is already retrieved and stored in a dataframe or if that is yet to happen

I am doing something like this

df1=spark.table("sourceDB.Table1")
df1.cache()

Now, as you might be aware, data is not read yet from the source table due to lazy execution. So I need to have an expression here that says the result as "False" at this point.

After sometime, I am doing some operation that requires data to be retrieved from source. For example.

df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()

At this point, data must have been read and stored in cache for df1. So I need to have an expression here that says the result as "True" at this point.

Is there anyway we can achieve this? I believe df1.is_cached will not help in this situation

Upvotes: 1

Views: 2416

Answers (2)

surj
surj

Reputation: 4913

pyspark version (loosely based on answer from @som)


def is_cached(df: DataFrame) -> bool:
    jspark: Any = df.sparkSession._jsparkSession
    jdf: Any = df._jdf
    plan = jdf.queryExecution().logical()
    cache = jspark.sharedState().cacheManager().lookupCachedData(plan)
    return (
        cache.nonEmpty() and
        cache.get().cachedRepresentation().cacheBuilder().isCachedColumnBuffersLoaded()
    )

df = spark.createDataFrame([Row(id=1)])
is_cached(df) # False
df.count()
is_cached(df) # False
df.cache()
is_cached(df) # False
df.count()
is_cached(df) # True

Upvotes: 2

Som
Som

Reputation: 6338

Perhaps this is useful

1. If you wanted to check whether the cache/persist is already triggered on the dataframe then you can use cachemanagerto confirm that as below-

spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty

2. If you wanted to check if the data is there in the memory, perhaps below method would be helpful-

 def checkIfDataIsInMemory(df: DataFrame): Boolean = {
      val manager = df.sparkSession.sharedState.cacheManager
      // step 1 - check if the dataframe.cache is issued earlier or not
      if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
        println("Cache statement is already issued on this dataframe")
        // step-2 check if the data is in memory or not
        val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
        cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
      } else false
    }

3. Test the above method-

 val df = spark.read
      .parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
        ".parquet").getPath)
    println(checkIfDataIsInMemory(df))
    /**
      * false
      */
    
    df.cache()
    // check if the data is cached
    println(checkIfDataIsInMemory(df))
    /**
      * Cache statement is already issued on this dataframe
      * false
      */
    
    println(df.count())
    println(checkIfDataIsInMemory(df))

    /**
      * 1
      * Cache statement is already issued on this dataframe
      * true
      */

Upvotes: 4

Related Questions