Kenny
Kenny

Reputation: 33

Spark Broadcast Variable

I tried the following code

val t1 = sc.parallelize(0 until 10)
val t2 = sc.broadcast(2)
val t3 = t1.filter(_ % t2.value == 0).persist()
t3.count()
t2.destroy()
t3.count()

It complains that "attempted to use Broadcast after it was destroyed" in the second t3.count(), which makes me confused. If I understand correctly, we call persist on t3 and thus after the first t3.count(), t3 is stored in memory. If so, t3 does not need to be recomputed in the second t3.count() and it should be safe to destroy t2. But it seems that this is not true. I wonder what happens here.

Upvotes: 3

Views: 453

Answers (1)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

Question : It complains that "attempted to use Broadcast after it was destroyed" in the second t3.count(), which makes me confused. If I understand correctly, we call persist on t3 and thus after the first t3.count(), t3 is stored in memory. If so, t3 does not need to be recomputed in the second t3.count() and it should be safe to destroy t2. But it seems that this is not true.


  • with spark-shell with spark 2.4.0, I am also getting the same error.

*But surprisingly, intellij local maven scala project (with Spark 2.4.5 and Spark 2.2.2 ) with use Case of cache/persist I am NOT getting this Exception. There might be issue in spark or may be some other reason.*

CASE 1 : With out using cache /persist calling destroy

  val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2)
  val t3 = t1.filter(_ % t2.value == 0)
  println(t3.count())
  t2.destroy()
  println(t3.count())

Since it is not cache orpersisted you will get below result Result :

org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at BroadCastCheck.scala:20) 
    at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)

Case2 : With using cache /persist calling destroy.
Use Case with cache/persist : The dataframe t3 will not be recomputed. hence no error after destroy

 val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2)
  val t3 = t1.filter(_ % t2.value == 0).cache // or persist as well  
  println(t3.count())
  t2.destroy()
  println(t3.count())

Result :

5

5

Upvotes: 1

Related Questions