Reputation: 557
scala/spark use udf function in spark shell for array manipulation in dataframe column
df.printSchema
root
|-- x: timestamp (nullable = true)
|-- date_arr: array (nullable = true)
| |-- element: timestamp (containsNull = true)
sample data:
|x | date_arr |
|---------------------- |---------------------------------------------------------------------- |
| 2009-10-22 19:00:00.0 | [2009-08-22 19:00:00.0, 2009-09-19 19:00:00.0, 2009-10-24 19:00:00.0] |
| 2010-10-02 19:00:00.0 | [2010-09-25 19:00:00.0, 2010-10-30 19:00:00.0] |
in udf.jar, I have this function to get ceiling date in date_arr according to x:
class CeilToDate extends UDF {
def evaluate(arr: Seq[Timestamp], x: Timestamp): Timestamp = {
arr.filter(_.before(x)).last
}
}
add jar to spark shell: spark-shell --jars udf.jar
in spark shell, I have HiveContext as val hc = new HiveContext(spc)
, and create function: hc.sql("create temporary function ceil_to_date as 'com.abc.udf.CeilToDate'")
when I make a query: hc.sql("select ceil_to_date(date_arr, x) as ceildate from df").show
, expecting to have a dataframe like:
|ceildate |
|----------------------|
|2009-09-19 19:00:00.0 |
|2010-09-25 19:00:00.0 |
however, it throws this error:
org.apache.spark.sql.AnalysisException: No handler for Hive udf class com.abc.udf.CeilToDate because: No matching method for class com.abc.udf.CeilToDate with (array, timestamp). Possible choices: FUNC(struct<>, timestamp)
Upvotes: 0
Views: 2694
Reputation: 41957
Why are you going through all the complexity of creating a udf jar and including the jar in spark-shell. You can just create one in spark-shell and use that in your dataframe
.
Given you have dataframe
as
scala> df.show(false)
+---------------------+---------------------------------------------------------------------+
|x |date_arr |
+---------------------+---------------------------------------------------------------------+
|2009-10-22 19:00:00.0|[2009-08-22 19:00:00.0, 2009-09-19 19:00:00.0, 2009-10-24 19:00:00.0]|
|2010-10-02 19:00:00.0|[2010-09-25 19:00:00.0, 2010-10-30 19:00:00.0] |
+---------------------+---------------------------------------------------------------------+
You can create a udf
function in spark-shell but before that you would need three imports
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import java.sql.Timestamp
import java.sql.Timestamp
scala> import scala.collection._
import scala.collection._
Then you can create a udf function
scala> def ceil_to_date = udf((arr: mutable.WrappedArray[Timestamp], x: Timestamp) => arr.filter(_.before(x)).last)
ceil_to_date: org.apache.spark.sql.expressions.UserDefinedFunction
Your desired output dataframe
can be acheived through different methods but simply you can use select
as
scala> df.select(ceil_to_date(col("date_arr"), col("x")).as("ceildate")).show(false)
+---------------------+
|ceildate |
+---------------------+
|2009-09-19 19:00:00.0|
|2010-09-25 19:00:00.0|
+---------------------+
I hope the answer is helpful
Upvotes: 1