Reputation: 107
I am creating one function in scala which i want to use in my spark-sql query.my query is working fine in hive or if i am giving the same query in spark sql but the same query i'm using at multiple places so i want to create it as reusable function/method so whenever its required i can just call it. I have created below function in my scala class.
def date_part(date_column:Column) = {
val m1: Column = month(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM-yyyy")))) //give value as 01,02...etc
m1 match {
case 01 => concat(concat(year(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM- yyyy"))))-1,'-'),substr(year(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM-yyyy")))),3,4))
//etc..
case _ => "some other logic"
}
}
but its showing multiple error.
◾Decimal integer literals may not have a leading zero. (Octal syntax is obsolete.)
◾type mismatch; found : Int(0) required: org.apache.spark.sql.Column.
type mismatch; found : Char('-') required: org.apache.spark.sql.Column.
not found: value substr.
also that if I'm creating any simple function also with type as column I'm not able to register it as I'm getting error not possible in columnar format.and for all primitive data types(String,Long,Int) its working fine.But in my case type is column so I'm not able to do this.Can someone please guide me how should i do this.as of now I found on stack-overflow that i need use this function with df and then need to convert this df as temp table.can someone please guide me any other alternate way so without much changes in my existing code i can use this functionality.
Upvotes: 0
Views: 233
Reputation: 312
Firstly, Spark will need read a file where data is stored, I guess this file is a CSV but you can use method json insted of csv.
Then you can add new columns with a calculated value as follow:
import org.apache.spark.sql.functions._
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/path/mydata.csv")
def transformDate( dateColumn: String, df: DataFrame) : DataFrame = {
df.withColumn("calculatedCol", month(to_date(from_unixtime(unix_timestamp(col(dateColumn), "dd-MM-yyyy")))))
df.withColumn("newColumnWithDate", when(col("calculatedCol") === "01", concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1, lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM-yyyy"))),4,2))
.when(col("calculatedCol") === "02","some other logic")
.otherwise("nothing match")))
}
// calling your function for the Dataframe you want transform date column:
transformDate("date_column", df)
Note some functions need a column as argument, not a string value, so use lit() for specify that values.
An UDF is not needed (and in terms of performance is not recommendable) but you can use it in the following way:
val upper: String => String = _.toUpperCase
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)
df.withColumn("upper", upperUDF('text)).show
Where 'upper' function will be the method you must include the logic to transform date column.
Upvotes: 0
Reputation: 10362
Try Below Code.
scala> import org.joda.time.format._
import org.joda.time.format._
scala> spark.udf.register("datePart",(date:String) => DateTimeFormat.forPattern("MM-dd-yyyy").parseDateTime(date).toString(DateTimeFormat.forPattern("MMyyyy")))
res102: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> spark.sql("""select datePart("03-01-2019") as datepart""").show
+--------+
|datepart|
+--------+
| 032019|
+--------+
Upvotes: 0