user3725190
user3725190

Reputation: 343

How can I iterate Spark's DataFrame rows?

I need to iterate over DataFrame rows.

I don't want to conver it into RDD and filter the desired row each time, e.g.:

var index = 0
def next = {
 val result = df.rdd.filter(_._2 == index).collect.map(_._1).headOption
 index += 1
 result
}

There is an option to call "collect" method which will return Array[Row], and iterated over it, but I believe it will not hold when having big amount of data.

val rowsIterator:Iterator[Row] = df.collect().iterator
rowsIterator.next

UPDATE: I was asked to give more information: I wish to write each row to my DB (in my case ES), but I don't to do it with backpresure in order to make the system more stable.

Upvotes: 3

Views: 5131

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

Well, you could do something like this:

val df = ss.range(10000).toDF("i")

val dfEnumerated = df
      .withColumn("row_number", row_number().over(Window.orderBy(lit(1))))
      .cache()

val collectRnb = (rnb:Int) => dfEnumerated.where($"rnb"===rnb).drop($"rnb").collect.headOption.map(_.getLong(0))
val records : Iterator[Option[Long]] = Iterator.from(1).map(collectRnb)

def next = records.next

But this becomes also problematic when dealing with very large datasets, because I used a window-function (row_number) without specifying a partitioning, thus this does not scale very well.

You could also use a combination of different methods, e.g. collect 1 partition at a time and iterate through this array.

Edit:

Since Spark 2.0, you can use .toLocalIterator() which will collect your data partition-wise:

Return an iterator that contains all of Rows in this Dataset. The iterator will consume as much memory as the largest partition in this Dataset

Upvotes: 2

Related Questions