Reputation: 423
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
This code snippet is from spark 2.2 source code. I am not professional in scala, so just wondering can anyone explain this code in a programmatic perspective? I am not sure what the square bracket after map does. And also refer to https://www.tutorialspoint.com/scala/scala_functions.htm, a scala function should just have curly bracket after '=', but why in this code snippet there is a function named withScope after the '=' sign?
Upvotes: 0
Views: 245
Reputation: 670
actually, a scala function can have no bracket after "=". eg.
def func():Int = 1
so you can think withScope{} is a function with return type RDD[U],and the map
function is to run the withScope function.
let's see the withScope's source code:
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
see, it's a function here. let's go on:
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val ourMethodName = "withScope"
val callerMethodName = Thread.currentThread.getStackTrace()
.dropWhile(_.getMethodName != ourMethodName)
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
// Log a warning just in case, but this should almost certainly never happen
logWarning("No valid method name for this RDD operation scope!")
"N/A"
}
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}
let continue with the withScope
at the end:
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
body
} finally {
// Remember to restore any state that was modified before exiting
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}
in the end, it execute body
parameter, int this case, body equals to
{
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
in conclusion, withScope is a Closure takes a function as argument, it first runs some code itself and the run the argument.
Upvotes: 1