Reputation: 596
Since I am a bit new to Spark Scala, I am finding it difficult to iterate through a Dataframe.
My dataframe contains 2 columns, one is path
and other is ingestiontime
.
Example -
Now I want to iterate through this dataframe and do the use the data in the Path
and ingestiontime
column to prepare a Hive
Query and run it , such that query that are run look like -
ALTER TABLE <hiveTableName> ADD PARTITON (ingestiontime=<Ingestiontime_From_the_DataFrame_ingestiontime_column>) LOCATION (<Path_From_the_dataFrames_path_column>)
To achieve this, I used -
allOtherIngestionTime.collect().foreach {
row =>
var prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = "+row.mkString("<SomeCustomDelimiter>").split("<SomeCustomDelimiter>")(1)+" LOCATION ( " + row.mkString("<SomeCustomDelimiter>").split("<SomeCustomDelimiter>")(0) + ")"
spark.sql(prepareHiveQuery)
}
But I feel this can be very dangerous, i.e when my Data consists of a similar Delimiter. I am very much interested to find out other ways of iterating through rows/columns of a Dataframe.
Upvotes: 1
Views: 3355
Reputation: 10362
Check below code.
df
.withColumn("query",concat_ws("",lit("ALTER TABLE myhiveTable ADD PARTITON (ingestiontime="),col("ingestiontime"),lit(") LOCATION (\""),col("path"),lit("\"))")))
.select("query")
.as[String]
.collect
.foreach(q => spark.sql(q))
Upvotes: 2
Reputation: 5686
In order to access your columns path
and ingestiontime
you can you row.getString(0)
and row.getString(1)
.
val allOtherIngestionTime: DataFrame = ???
allOtherIngestionTime.foreach {
row =>
val prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = "+row.getString(1)+" LOCATION ( " + row.getString(0) + ")"
spark.sql(prepareHiveQuery)
}
If you use Datasets instead of Dataframes you will be able to use row.path
and row.ingestiontime
in an easier way.
case class myCaseClass(path: String, ingestionTime: String)
val ds: Dataset[myCaseClass] = ???
ds.foreach({ row =>
val prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = " + row.ingestionTime + " LOCATION ( " + row.path + ")"
spark.sql(prepareHiveQuery)
})
In any case, to iterate over a Dataframe or a Dataset you can use foreach
, or map
if you want to convert the content into something else.
Also, using collect()
you are bringing all the data to the driver and that is not recommended, you could use foreach
or map
without collect()
If what you want is to iterate over the row
fields, you can make it a Seq
and iterate:
row.toSeq.foreach{column => ...}
Upvotes: 1