teddy
teddy

Reputation: 423

Need some explanation to understand the map function in scala

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

This code snippet is from spark 2.2 source code. I am not professional in scala, so just wondering can anyone explain this code in a programmatic perspective? I am not sure what the square bracket after map does. And also refer to https://www.tutorialspoint.com/scala/scala_functions.htm, a scala function should just have curly bracket after '=', but why in this code snippet there is a function named withScope after the '=' sign?

Upvotes: 0

Views: 245

Answers (1)

flyhighzy
flyhighzy

Reputation: 670

actually, a scala function can have no bracket after "=". eg.

def func():Int = 1

so you can think withScope{} is a function with return type RDD[U],and the map function is to run the withScope function.

let's see the withScope's source code:

private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)

see, it's a function here. let's go on:

private[spark] def withScope[T](
    sc: SparkContext,
    allowNesting: Boolean = false)(body: => T): T = {
    val ourMethodName = "withScope"
    val callerMethodName = Thread.currentThread.getStackTrace()
    .dropWhile(_.getMethodName != ourMethodName)
    .find(_.getMethodName != ourMethodName)
    .map(_.getMethodName)
    .getOrElse {
      // Log a warning just in case, but this should almost certainly never happen
      logWarning("No valid method name for this RDD operation scope!")
      "N/A"
    }
  withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}

let continue with the withScope at the end:

private[spark] def withScope[T](
    sc: SparkContext,
    name: String,
    allowNesting: Boolean,
    ignoreParent: Boolean)(body: => T): T = {
  // Save the old scope to restore it later
  val scopeKey = SparkContext.RDD_SCOPE_KEY
  val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
  val oldScopeJson = sc.getLocalProperty(scopeKey)
  val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
  val oldNoOverride = sc.getLocalProperty(noOverrideKey)
  try {
    if (ignoreParent) {
      // Ignore all parent settings and scopes and start afresh with our own root scope
      sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
    } else if (sc.getLocalProperty(noOverrideKey) == null) {
      // Otherwise, set the scope only if the higher level caller allows us to do so
      sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
    }
    // Optionally disallow the child body to override our scope
    if (!allowNesting) {
      sc.setLocalProperty(noOverrideKey, "true")
    }
    body
  } finally {
    // Remember to restore any state that was modified before exiting
    sc.setLocalProperty(scopeKey, oldScopeJson)
    sc.setLocalProperty(noOverrideKey, oldNoOverride)
  }
}

in the end, it execute body parameter, int this case, body equals to

{
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

in conclusion, withScope is a Closure takes a function as argument, it first runs some code itself and the run the argument.

Upvotes: 1

Related Questions