Reputation: 944
I have a use case where I need to read data from Hive tables (Parquet), convert Timestamp columns to a certain format and write the output as csv.
For the date format thing, I want to write a function that takes a StructField and returns either the original field name or date_format($"col_name", "dd-MMM-yyyy hh.mm.ss a")
, if the dataType
is TimestampType
. This is what I have come up with so far
def main (String[] args) {
val hiveSchema = args(0)
val hiveName = args(1)
val myDF = spark.table(s"${hiveSchema}.${hiveTable}")
val colArray = myDF.schema.fields.map(getColumns)
val colString = colArray.mkString(",")
myDF.select(colString).write.format("csv").mode("overwrite").option("header", "true").save("/tmp/myDF")
}
def getColumns(structField: StructField): String = structField match {
case structField if(structField.dataType.simpleString.equalsIgnoreCase("TimestampType")) => s"""date_format($$"${structField.name}", "dd-MMM-yy hh.mm.ss a")"""
case _ => structField.name
}
But I get the following error at runtime
org.apache.spark.sql.AnalysisException: cannot resolve '`date_format($$"my_date_col", "dd-MMM-yy hh.mm.ss a")`' given input columns [mySchema.myTable.first_name, mySchema.myTable.my_date_col];
Is there a better way to do this?
Upvotes: 0
Views: 819
Reputation: 42422
Remove the double dollar sign and quotes. Also, no need to mkString
; just use selectExpr
:
def main (String[] args) {
val hiveSchema = args(0)
val hiveName = args(1)
val myDF = spark.table(s"${hiveSchema}.${hiveTable}")
val colArray = myDF.schema.fields.map(getColumns)
myDF.selectExpr(colArray: _*).write.format("csv").mode("overwrite").option("header", "true").save("/tmp/myDF")
}
def getColumns(structField: StructField): String = structField match {
case structField if(structField.dataType.simpleString.equalsIgnoreCase("TimestampType")) => s"""date_format(${structField.name}, "dd-MMM-yy hh.mm.ss a") as ${structField.name}"""
case _ => structField.name
}
Upvotes: 1