Reputation: 343
I need to iterate over DataFrame rows.
I don't want to conver it into RDD and filter the desired row each time, e.g.:
var index = 0
def next = {
val result = df.rdd.filter(_._2 == index).collect.map(_._1).headOption
index += 1
result
}
There is an option to call "collect" method which will return Array[Row], and iterated over it, but I believe it will not hold when having big amount of data.
val rowsIterator:Iterator[Row] = df.collect().iterator
rowsIterator.next
UPDATE: I was asked to give more information: I wish to write each row to my DB (in my case ES), but I don't to do it with backpresure in order to make the system more stable.
Upvotes: 3
Views: 5131
Reputation: 27373
Well, you could do something like this:
val df = ss.range(10000).toDF("i")
val dfEnumerated = df
.withColumn("row_number", row_number().over(Window.orderBy(lit(1))))
.cache()
val collectRnb = (rnb:Int) => dfEnumerated.where($"rnb"===rnb).drop($"rnb").collect.headOption.map(_.getLong(0))
val records : Iterator[Option[Long]] = Iterator.from(1).map(collectRnb)
def next = records.next
But this becomes also problematic when dealing with very large datasets, because I used a window-function (row_number
) without specifying a partitioning, thus this does not scale very well.
You could also use a combination of different methods, e.g. collect 1 partition at a time and iterate through this array.
Edit:
Since Spark 2.0, you can use .toLocalIterator()
which will collect your data partition-wise:
Return an iterator that contains all of Rows in this Dataset. The iterator will consume as much memory as the largest partition in this Dataset
Upvotes: 2