user13542005
user13542005

Reputation: 25

How to persist the list which we made dynamically from dataFrame in scala spark

 def getAnimalName(dataFrame: DataFrame): List[String] = {
    dataFrame.select("animal").
      filter(col("animal").isNotNull && col("animal").notEqual("")).
      rdd.map(r => r.getString(0)).distinct().collect.toList
  }

I am basicaly Calling this function 2 times For getting the list for different purposes . I just want to know is there a way to retain the list in memory and we dont have to call the same function again and again to generate the list and only have to generate the list only one time in scala spark.

Upvotes: 2

Views: 474

Answers (2)

QuickSilver
QuickSilver

Reputation: 4045

Try something as below and you can also check the performance using time func. Also find the code explanation inline

import org.apache.spark.rdd
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, functions}

object HandleCachedDF {

  var cachedAnimalDF : rdd.RDD[String] = _
  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess
    val df = spark.read.json("src/main/resources/hugeTest.json") // Load your Dataframe
    val df1 = time[rdd.RDD[String]] {
      getAnimalName(df)
    }
    val resultList = df1.collect().toList
    val df2 = time{
      getAnimalName(df)
    }
    val resultList1 = df2.collect().toList
    println(resultList.equals(resultList1))
  }

  def getAnimalName(dataFrame: DataFrame): rdd.RDD[String] = {
    if (cachedAnimalDF == null) { // Check if this the first initialization of your dataframe
      cachedAnimalDF = dataFrame.select("animal").
        filter(functions.col("animal").isNotNull && col("animal").notEqual("")).
        rdd.map(r => r.getString(0)).distinct().cache() // Cache your dataframe
    }
    cachedAnimalDF // Return your cached dataframe
  }

  def time[R](block: => R): R = { // COmpute the time taken by function to execute
    val t0 = System.nanoTime()
    val result = block // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0) + "ns")
    result
  }

}

Upvotes: 1

Chema
Chema

Reputation: 2838

You would have to persist or cache at this point

dataFrame.select("animal").
      filter(col("animal").isNotNull && col("animal").notEqual("")).
      rdd.map(r => r.getString(0)).distinct().persist

and then call the function as follow

def getAnimalName(dataFrame: DataFrame): List[String] = {
    dataFrame.collect.toList
  }

as many times as you need it without repeat the process. I hope it helps.

Upvotes: 0

Related Questions