kz28
kz28

Reputation: 793

Spark Source code: How to understand withScope method

I can not understand the functionality of withScope method (Actually, I do not really know the meaning of the RDDOperationScope Class)

Especially, what's the meaning of (body: => T) in the parameter list of withScope method:

private[spark] def withScope[T](
  sc: SparkContext,
  name: String,
  allowNesting: Boolean,
  ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
  if (ignoreParent) {
    // Ignore all parent settings and scopes and start afresh with our own root scope
    sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
  } else if (sc.getLocalProperty(noOverrideKey) == null) {
    // Otherwise, set the scope only if the higher level caller allows us to do so
    sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
  }
  // Optionally disallow the child body to override our scope
  if (!allowNesting) {
    sc.setLocalProperty(noOverrideKey, "true")
  }
  body
} finally {
  // Remember to restore any state that was modified before exiting
  sc.setLocalProperty(scopeKey, oldScopeJson)
  sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}

You can find the source code with this link: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Can anyone help me? Thanks, I was confused at that for a long time.

Upvotes: 8

Views: 2484

Answers (2)

Mahesh
Mahesh

Reputation: 1683

You need to see how withScope is called. This is an example in RDD.scala

     /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

Basically it creates a new scope(code block), so that variables in previous function is not mixed with current function. The body of the scope is what is passed after withScope which in this case is

      {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      }

I am yet to get to the point where the old scope is restored.

Upvotes: 1

ke xiong
ke xiong

Reputation: 86

The following code may help you

object TestWithScope {
    def withScope(func: => String) = {
        println("withscope")
        func
    }

    def bar(foo: String) = withScope {
        println("Bar: " + foo)
        "BBBB"
    }

    def main(args: Array[String]): Unit = {
        println(bar("AAAA"));
    }
}

Possible output

withscope
Bar: AAAA
BBBB

Upvotes: 7

Related Questions