Sean
Sean

Reputation: 215

Spark - Why does ArrayBuffer seem to get elements that haven't been traversed yet

Why does the ArrayBuffer in the MapPartition seem to have elements that it has not traversed yet?

For instance, the way I look at this code, the first item should have 1 element, second 2, third 3 and so on. How could it be possible that the first ArrayBuffer output has 9 items. That would seem to imply that there were 9 iterations prior to the first output, but the yields count makes it clear that this was the first iteration.

val a = ArrayBuffer[Int]()
for(i <- 1 to 9) a += i
for(i <- 1 to 9) a += 9-i
val rdd1 = sc.parallelize(a.toArray())

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = {
    val currentArray = ArrayBuffer[Int]()
    var loss = 0
    var yields = 0
    for (item <- iter) yield {
        currentArray += item
        //var left : Int = -1
        yields += 1
        Row(yields, item.toString(), currentArray)
    }
}

rdd1.mapPartitions(it => timePivotWithLoss(it)).collect()

Output -

[1,1,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[2,2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[3,3,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[4,4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[5,5,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[6,6,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[7,7,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[8,8,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[9,9,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[1,8,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[2,7,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[3,6,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[4,5,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[5,4,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[6,3,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[7,2,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[8,1,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[9,0,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

Upvotes: 0

Views: 267

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

This happens because all rows in the partition use reference to the same mutable object. Spilling to disc could further make it non-deterministic with some objects being serialized and not reflecting the changes.

You can use mutable reference and immutable object:

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = {
  var currentArray = Vector[Int]()
  var loss = 0
  var yields = 0
  for (item <- iter) yield {
    currentArray = currentArray :+ item
    yields += 1
    Row(yields, item.toString(), currentArray)
  }
}

but in general mutable state and Spark are not good match.

Upvotes: 1

Related Questions