radumanolescu
radumanolescu

Reputation: 4161

Spark iterate over dataframe rows, cells

(Spark beginner) I wrote the code below to iterate over the rows and columns of a data frame (Spark 2.4.0 + Scala 2.12). I have computed the row and cell counts as a sanity check. I was surprised to find that the method returns 0, even though the counters are incremented during the iteration.

To be precise: while the code is running it prints messages showing that it has found

After the iteration is done, it prints "Found 0 cells", and returns 0.

I understand that Spark is a distributed processing engine, and that code is not executed exactly as written - but how should I think about this code?

The row/cell counts were just a sanity check; in reality I need to loop over the data and accumulate some results, but how do I prevent Spark from zeroing out my results as soon as the iteration is done?

  def processDataFrame(df: sql.DataFrame): Int = {
    var numRows = 0
    var numCells = 0
    df.foreach { row =>
      numRows += 1
      if (numRows % 10 == 0) println(s"Found row $numRows") // prints 10,20,...,610
      row.toSeq.foreach { c =>
        if (numCells % 100 == 0) println(s"Found cell $numCells") // prints 100,200,...,15800
        numCells += 1
      }
    }
    println(s"Found $numCells cells") // prints 0
    numCells
  }

Upvotes: 0

Views: 683

Answers (1)

Rajan Chauhan
Rajan Chauhan

Reputation: 1385

Spark have accumulators variables which provides you functionality like counting in a distributed environment. You can use a simple long and int type of accumulator. Even custom datatype of accumulator can also be implemented quite easily in Spark.
In your code changing your counting variables to accumulator variables like below will give you the correct result.

val numRows = sc.longAccumulator("numRows Accumulator")  // string name only for debug purpose
val numCells = sc.longAccumulator("numCells Accumulator")
df.foreach { row =>
  numRows.add(1)
  if (numRows.value % 10 == 0) println(s"Found row ${numRows.value}") // prints 10,20,...,610
  row.toSeq.foreach { c =>
    if (numCells.value % 100 == 0) println(s"Found cell ${numCells.value}") // prints 100,200,...,15800
    numCells.add(1)
  }
}
println(s"Found ${numCells.value} cells") // prints 0
numCells.value

Upvotes: 1

Related Questions