Jake
Jake

Reputation: 4660

spark: Attempted to use Broadcast after it was destroyed

The following code works

 @throws(classOf[IKodaMLException])
 def soMergeTarget1( oldTargetIdx: Double, newTargetIdx: Double): RDDLabeledPoint =
 {
   try
   {
    logger.trace("\n\n--sparseOperationRenameTargetsInNumeriOrder--\n\n")
    val oldTargetIdxb=spark.sparkContext.broadcast(oldTargetIdx)
    val newTargetIdxb=spark.sparkContext.broadcast(newTargetIdx)

    val newdata:RDD[(LabeledPoint,Int,String)] = sparseData.map
    {
      r =>

        val currentLabel: Double = r._1.label
        currentLabel match
        {
          case x if x == oldTargetIdxb.value =>
          val newtrgt=newTargetIdxb.value
          (new LabeledPoint(newtrgt, r._1.features), r._2, r._3)
          case _ => r
        }
    }
  val newtargetmap=ilp.targetMap.filter(e=> !(e._2 == oldTargetIdx))
  oldTargetIdxb.destroy
  newTargetIdxb.destroy
  new RDDLabeledPoint(newdata,copyColumnMap,newtargetmap,ilp.name)
}

But, having destroyed the broadcast variables at the end of the method, the newtrgt variable in the RDD is also destroyed. The trouble is that once the RDD is returned from this method it could be used by any analyst in any code. So, I seem to have lost all control of the broadcast variables.

Questions:

If I don't destroy the variables, will spark destroy them when reference to the RDD disappears?

(Perhaps a naive question but....) I tried a little hack val newtrgt=oldTargetIdxb.value + 1 -1 thinking that might create a new reference that is distinct from the broadcast variable. It didn't work. I must admit that surprised me. Can someone explain why the hack didn't work (I'm not suggesting it was a good idea, but I am curious).

Upvotes: 0

Views: 1876

Answers (1)

Jake
Jake

Reputation: 4660

I found an answer here

Not my answer but worth sharing on SO...and why can't I see this in Spark documentation. It's important:

Sean Owen:

you want to actively unpersist() or destroy() broadcast variables when they're no longer needed. They can eventually be removed when the reference on the driver is garbage collected, but you usually would not want to rely on that.

Follow up question:

Thank you for the response. The only problem is that actively managing broadcast variables require to return the broadcast variables to the caller if the function that creates the broadcast variables does not contain any action. That is the scope that uses the broadcast variables cannot destroy the broadcast variables in many cases. For example:

==============

def perfromTransformation(rdd: RDD[int]) = {
   val sharedMap = sc.broadcast(map)
   rdd.map{id => 
      val localMap = sharedMap.vlaue
      (id, localMap(id))
   }
}

def main = {
    ....
    performTransformation(rdd).toDF("id", "i").write.parquet("dummy_example")
}

==============

In this example above, we cannot destroy the sharedMap before the write.parquet is executed because RDD is evaluated lazily. We will get a exception

Sean Owen:

Yes, although there's a difference between unpersist and destroy, you'll hit the same type of question either way. You do indeed have to reason about when you know the broadcast variable is no longer needed in the face of lazy evaluation, and that's hard.

Sometimes it's obvious and you can take advantage of this to proactively free resources. You may have to consider restructuring the computation to allow for more resources to be freed, if this is important to scale.

Keep in mind that things that are computed and cached may be lost and recomputed even after their parent RDDs were definitely already computed and don't seem to be needed. This is why unpersist is often the better thing to call because it allows for variables to be rebroadcast if needed in this case. Destroy permanently closes the broadcast.

Upvotes: 2

Related Questions