a.moussa
a.moussa

Reputation: 3287

weird affectation to a variable in foreachPartition

I attempt a transactionCount variable to be 100 and I get 0. I have an RDD with always, only one partition. I have a piece of code like this which processes the RDD:

var transactionCount = -1

payment_rdd.foreachPartition { partitionOfRecords =>
  // this line affect 100 to transactionCount  since the I have 100 record 
  // in my RDD so in my partition
  transactionCount = partitionOfRecords.size
  partitionOfRecords.foreach { record =>
      // I process each record
  }
  try {
    // this line keep 100 to transactionCount 

    //another process
  }
  catch {
    case _: Throwable => {
      // I never pass here
      log.error("Cannot process correctly")
      transactionCount = 0
    }
  }
}
return transactionCount

I get in return -1 despite 100 and I can't understand why. Do you have any idea or a better solution? Thanks

Upvotes: 0

Views: 431

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

When you execute this code:

  • Spark computes closure.
  • Serializes each variable required by closure and send it to the executors.
  • When code is executed each executor modifies local copy of the deserialized variable.

This is described and explained in programming guide

Also you cannot do this:

transactionCount = partitionOfRecords.size

Iterators can be traversed only once and will be empty after you compute size.

I'd use Try and accumulator:

val transactionCount = spark.sparkContext.longAccumulator

rdd.foreach { record => {
   if Try {
     // your code goes here
   }.isSuccess transactionCount.add(1L)
}}

Upvotes: 2

Related Questions