yanik1984
yanik1984

Reputation: 31

Spark caching difference between 2.0.2 and 2.1.1

I'm a bit confused with spark caching behavior. I want to compute dependent dataset (b), cache it and unpersist source dataset(a) - here is my code:

val spark = SparkSession.builder().appName("test").master("local[4]").getOrCreate()
import spark.implicits._
val a = spark.createDataset(Seq(("a", 1), ("b", 2), ("c", 3)))
a.createTempView("a")
a.cache
println(s"Is a cached: ${spark.catalog.isCached("a")}")
val b = a.filter(x => x._2 < 3)
b.createTempView("b")
// calling action
b.cache.first
println(s"Is b cached: ${spark.catalog.isCached("b")}")

spark.catalog.uncacheTable("a")
println(s"Is b cached after a was unpersisted: ${spark.catalog.isCached("b")}")

When using spark 2.0.2 it works as expected:

Is a cached: true
Is b cached: true
Is b cached after a was unpersisted: true

But on 2.1.1:

Is a cached: true
Is b cached: true
Is b cached after a was unpersisted: false

How can i archieve the same behavior in 2.1.1?

Thank you.

Upvotes: 3

Views: 320

Answers (1)

gasparms
gasparms

Reputation: 3354

I don't know how it should be. According to tests, in Spark 2.1.1 it works as is expected, but there are a couple of comments that reflect some doubts. Maybe you could open a JIRA in Spark project to clarify this situation.

CachedTableSuite.scala

test("uncaching temp table") {
  testData.select('key).createOrReplaceTempView("tempTable1")
  testData.select('key).createOrReplaceTempView("tempTable2")
  spark.catalog.cacheTable("tempTable1")

  assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
  assertCached(sql("SELECT COUNT(*) FROM tempTable2"))

  // Is this valid?
  spark.catalog.uncacheTable("tempTable2")

  // Should this be cached?
  assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
}

assertCached method check numCachedTables is equal to the second argument.

QueryTest.scala

/**
 * Asserts that a given [[Dataset]] will be executed using the given number of cached results.
 */
def assertCached(query: Dataset[_], numCachedTables: Int = 1): Unit = {
  val planWithCaching = query.queryExecution.withCachedData
  val cachedData = planWithCaching collect {
    case cached: InMemoryRelation => cached
  }

  assert(
    cachedData.size == numCachedTables,
    s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
    planWithCaching)
}

Upvotes: 1

Related Questions