Golokesh Patra
Golokesh Patra

Reputation: 596

Iterate Through Rows of a Dataframe

Since I am a bit new to Spark Scala, I am finding it difficult to iterate through a Dataframe. My dataframe contains 2 columns, one is path and other is ingestiontime. Example -

enter image description here

Now I want to iterate through this dataframe and do the use the data in the Path and ingestiontime column to prepare a Hive Query and run it , such that query that are run look like -

ALTER TABLE <hiveTableName> ADD PARTITON (ingestiontime=<Ingestiontime_From_the_DataFrame_ingestiontime_column>) LOCATION (<Path_From_the_dataFrames_path_column>)

To achieve this, I used -

allOtherIngestionTime.collect().foreach {
  row =>
     var prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = "+row.mkString("<SomeCustomDelimiter>").split("<SomeCustomDelimiter>")(1)+" LOCATION ( " + row.mkString("<SomeCustomDelimiter>").split("<SomeCustomDelimiter>")(0) + ")"
      spark.sql(prepareHiveQuery)

} 

But I feel this can be very dangerous, i.e when my Data consists of a similar Delimiter. I am very much interested to find out other ways of iterating through rows/columns of a Dataframe.

Upvotes: 1

Views: 3355

Answers (2)

s.polam
s.polam

Reputation: 10362

Check below code.

df
.withColumn("query",concat_ws("",lit("ALTER TABLE myhiveTable ADD PARTITON (ingestiontime="),col("ingestiontime"),lit(") LOCATION (\""),col("path"),lit("\"))")))
.select("query")
.as[String]
.collect
.foreach(q => spark.sql(q))

Upvotes: 2

Javier Mont&#243;n
Javier Mont&#243;n

Reputation: 5686

In order to access your columns path and ingestiontime you can you row.getString(0) and row.getString(1).

DataFrames

val allOtherIngestionTime: DataFrame = ???
    allOtherIngestionTime.foreach {
      row =>
        val prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = "+row.getString(1)+" LOCATION ( " + row.getString(0) + ")"
        spark.sql(prepareHiveQuery)
    }

Datasets

If you use Datasets instead of Dataframes you will be able to use row.path and row.ingestiontime in an easier way.

case class myCaseClass(path: String, ingestionTime: String)

val ds: Dataset[myCaseClass] = ???

ds.foreach({ row =>
  val prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = " + row.ingestionTime + " LOCATION ( " + row.path + ")"
  spark.sql(prepareHiveQuery)
})

In any case, to iterate over a Dataframe or a Dataset you can use foreach , or map if you want to convert the content into something else.

Also, using collect() you are bringing all the data to the driver and that is not recommended, you could use foreach or map without collect()

If what you want is to iterate over the row fields, you can make it a Seq and iterate:

row.toSeq.foreach{column => ...}

Upvotes: 1

Related Questions