ps0604
ps0604

Reputation: 1081

Iterate rows and columns in Spark dataframe

I have the following Spark dataframe that is created dynamically:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

Now, I need to iterate each row and column in sqlDF to print each column, this is my attempt:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

row is type Row, but is not iterable that's why this code throws a compilation error in row.foreach. How to iterate each column in Row?

Upvotes: 37

Views: 190073

Answers (9)

Carlos
Carlos

Reputation: 1

My solution using FOR because it was I need:

Solution 1:

case class campos_tablas(name:String, sector:String, age:Int)

for (row <- df.as[campos_tablas].take(df.count.toInt)) 
{ 
     print(row.name.toString)

}    

Solution 2:

for (row <- df.take(df.count.toInt))
{ 
   print(row(0).toString)
}

Upvotes: 0

J_V
J_V

Reputation: 397

Let's assume resultDF is the Dataframe.

val resultDF = // DataFrame //
var itr = 0
val resultRow = resultDF.count
val resultSet = resultDF.collectAsList
var load_id = 0
var load_dt = ""
var load_hr = 0

while ( itr < resultRow ){
    col1 = resultSet.get(itr).getInt(0)
    col2 = resultSet.get(itr).getString(1) // if column is having String value
    col3 = resultSet.get(itr).getLong(2) // if column is having Long value

    // Write other logic for your code //

    itr = itr + 1
}

Upvotes: -1

skjagini
skjagini

Reputation: 3217

You should iterate over the partitions which allows the data to be processed by Spark in parallel and you can do foreach on each row inside the partition.

You can further group the data in partition into batches if need be

sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>     
  if (partitionedRows.take(1).nonEmpty) {
       partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
        batch.foreach { row => 
        .....

Upvotes: 2

Pooja Bhat
Pooja Bhat

Reputation: 79

This worked fine for me

sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))

Upvotes: 2

GANESH CHOKHARE
GANESH CHOKHARE

Reputation: 355

simple collect result and then apply foreach

df.collect().foreach(println)

Upvotes: 0

Sarath Subramanian
Sarath Subramanian

Reputation: 21381

Consider you have a Dataframe like below

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+

To loop your Dataframe and extract the elements from the Dataframe, you can either chose one of the below approaches.

Approach 1 - Loop using foreach

Looping a dataframe directly using foreach loop is not possible. To do this, first you have to define schema of dataframe using case class and then you have to specify this schema to the dataframe.

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))

Please see the result below :

enter image description here

Approach 2 - Loop using rdd

Use rdd.collect on top of your Dataframe. The row variable will contain each row of Dataframe of rdd row type. To get each element from a row, use row.mkString(",") which will contain value of each row in comma separated values. Using split function (inbuilt function) you can access each column value of rdd row with index.

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}

Note that there are two drawback of this approach.
1. If there is a , in the column value, data will be wrongly split to adjacent column.
2. rdd.collect is an action that returns all the data to the driver's memory where driver's memory might not be that much huge to hold the data, ending up with getting the application failed.

I would recommend to use Approach 1.

Approach 3 - Using where and select

You can directly use where and select which will internally loop and finds the data. Since it should not throws Index out of bound exception, an if condition is used

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

Approach 4 - Using temp tables

You can register dataframe as temptable which will be stored in spark's memory. Then you can use a select query as like other database to query the data and then collect and save in a variable

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")

Upvotes: 34

Naresh Joshi
Naresh Joshi

Reputation: 4597

sqlDF.foreach is not working for me but Approach 1 from @Sarath Avanavu answer works but it was also playing with the order of the records sometime.

I found one more way which is working

df.collect().foreach { row =>
   println(row.mkString(","))
}

Upvotes: 6

Raphael Roth
Raphael Roth

Reputation: 27373

You should use mkString on your Row:

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

But note that this will be printed inside the executors JVM's, so norally you won't see the output (unless you work with master = local)

Upvotes: 7

SCouto
SCouto

Reputation: 7926

You can convert Row to Seq with toSeq. Once turned to Seq you can iterate over it as usual with foreach, map or whatever you need

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

Output:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40

Upvotes: 23

Related Questions