Reputation: 793
I can not understand the functionality of withScope method (Actually, I do not really know the meaning of the RDDOperationScope Class)
Especially, what's the meaning of (body: => T) in the parameter list of withScope method:
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
body
} finally {
// Remember to restore any state that was modified before exiting
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}
You can find the source code with this link: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Can anyone help me? Thanks, I was confused at that for a long time.
Upvotes: 8
Views: 2484
Reputation: 1683
You need to see how withScope is called. This is an example in RDD.scala
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
Basically it creates a new scope(code block), so that variables in previous function is not mixed with current function. The body of the scope is what is passed after withScope which in this case is
{
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
I am yet to get to the point where the old scope is restored.
Upvotes: 1
Reputation: 86
The following code may help you
object TestWithScope {
def withScope(func: => String) = {
println("withscope")
func
}
def bar(foo: String) = withScope {
println("Bar: " + foo)
"BBBB"
}
def main(args: Array[String]): Unit = {
println(bar("AAAA"));
}
}
Possible output
withscope
Bar: AAAA
BBBB
Upvotes: 7