Paul R
Paul R

Reputation: 63

Spark closure argument binding

I am working with Apache Spark in Scala.

I have a problem when trying to manipulate one RDD with data from a second RDD. I am trying to pass the 2nd RDD as an argument to a function being 'mapped' against the first RDD, but seemingly the closure created on that function binds an uninitialized version of that value.

Following is a simpler piece of code that shows the type of problem I'm seeing. (My real example where I first had trouble is larger and less understandable).

I don't really understand the argument binding rules for Spark closures.

What I'm really looking for is a basic approach or pattern for how to manipulate one RDD using the content of another (which was previously constructed elsewhere).

In the following code, calling Test1.process(sc) will fail with a null pointer access in findSquare (as the 2nd arg bound in the closure is not initialized)

object Test1 {

  def process(sc: SparkContext) {
    val squaresMap = (1 to 10).map(n => (n, n * n))
    val squaresRDD = sc.parallelize(squaresMap)

    val primes = sc.parallelize(List(2, 3, 5, 7))

    for (p <- primes) {
      println("%d: %d".format(p, findSquare(p, squaresRDD)))
    }
  }

  def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = {
    squaresRDD.filter(kv => kv._1 == n).first._1
  }
}

Upvotes: 4

Views: 1149

Answers (2)

zero323
zero323

Reputation: 330353

Problem you experience has nothing to do with closures or RDDs which, contrary to popular belief, are serializable.

It is simply breaks a fundamental Spark rule which states that you cannot trigger an action or transformation from another action or transformation* and different variants of this question have been asked on SO multiple times.

To understand why that's the case you have to think about the architecture:

  • SparkContext is managed on the driver
  • everything that happens inside transformations is executed on the workers. Each worker have access only to its own part of the data and don't communicate with other workers**.

If you want to use content of multiple RDDs you have to use one of the transformations which combine RDDs, like join, cartesian, zip or union.

Here you most likely (I am not sure why you pass tuple and use only first element of this tuple) want to either use a broadcast variable:

val squaresMapBD = sc.broadcast(squaresMap)

def findSquare(n: Int): Seq[(Int, Int)] = {
  squaresMapBD.value
    .filter{case (k, v) => k == n}
    .map{case (k, v) => (n, k)}
    .take(1)
}

primes.flatMap(findSquare)

or Cartesian:

primes
  .cartesian(squaresRDD)
  .filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}

Converting primes to dummy pairs (Int, null) and join would be more efficient:

primes.map((_, null)).join(squaresRDD).map(...)

but based on your comments I assume you're interested in a scenario when there is natural join condition.

Depending on a context you can also consider using database or files to store common data.

On a side note RDDs are not iterable so you cannot simply use for loop. To be able to do something like this you have to collect or convert toLocalIterator first. You can also use foreach method.


* To be precise you cannot access SparkContext.

** Torrent broadcast and tree aggregates involve communication between executors so it is technically possible.

Upvotes: 4

axlpado - Agile Lab
axlpado - Agile Lab

Reputation: 353

RDD are not serializable, so you can't use an rdd inside an rdd trasformation. Then I've never seen enumerate an rdd with a for statement, usually I use foreach statement that is part of rdd api.

In order to combine data from two rdd, you can leverage join, union or broadcast ( in case your rdd is small)

Upvotes: -3

Related Questions