Reputation: 33
I have a UDF that filters and selects values from a dataframe, but it runs into "object not serializable" error. Details below.
Suppose I have a dataframe df1 that has columns with names ("ID", "Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10"). I want sum a subset of the "Y" columns based on the matching "ID" and "Value" from another dataframe df2. I tried the following:
val y_list = ("Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10").map(c => col(c))
def udf_test(ID: String, value: Int): Double = {
df1.filter($"ID" === ID).select(y_list:_*).first.toSeq.toList.take(value).foldLeft(0.0)(_+_)
}
sqlContext.udf.register("udf_test", udf_test _)
val df_result = df2.withColumn("Result", callUDF("udf_test", $"ID", $"Value"))
This gives me errors of the form:
java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: Y1)
I looked this up and realized that Spark Column is not serializable. I am wondering:
1) There is any way to manipulate a dataframe within an UDF?
2) If not, what's the best way to achieve the type of operation above? My real case is more complicated than this. It requires me to select values from multiple small dataframes based on some columns in a big dataframe, and compute back a value to the big dataframe.
I am using Spark 1.6.3. Thanks!
Upvotes: 3
Views: 3067
Reputation: 54
import org.apache.spark.sql.functions._
val events = Seq (
(1,1,2,3,4),
(2,1,2,3,4),
(3,1,2,3,4),
(4,1,2,3,4),
(5,1,2,3,4)).toDF("ID","amt1","amt2","amt3","amt4")
var prev_amt5=0
var i=1
def getamt5value(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) : Int = {
if(i==1){
i=i+1
prev_amt5=0
}else{
i=i+1
}
if (ID == 0)
{
if(amt1==0)
{
val cur_amt5= 1
prev_amt5=cur_amt5
cur_amt5
}else{
val cur_amt5=1*(amt2+amt3)
prev_amt5=cur_amt5
cur_amt5
}
}else if (amt4==0 || (prev_amt5==0 & amt1==0)){
val cur_amt5=0
prev_amt5=cur_amt5
cur_amt5
}else{
val cur_amt5=prev_amt5 + amt2 + amt3 + amt4
prev_amt5=cur_amt5
cur_amt5
}
}
val getamt5 = udf {(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) =>
getamt5value(ID,amt1,amt2,amt3,amt4)
}
myDF.withColumn("amnt5", getamt5(myDF.col("ID"),myDF.col("amt1"),myDF.col("amt2"),myDF.col("amt3"),myDF.col("amt4"))).show()
Upvotes: 0
Reputation: 16076
You can't use Dataset operations inside UDFs. UDF can only manupulate on existing columns and produce one result column. It can't filter Dataset or make aggregations, but it can be used inside filter. UDAF also can aggregate values.
Instead, you can use .as[SomeCaseClass]
to make Dataset from DataFrame and use normal, strongly typed functions inside filter, map, reduce.
Edit: If you want to join your bigDF with every small DF in smallDFs List, you can do:
import org.apache.spark.sql.functions._
val bigDF = // some processing
val smallDFs = Seq(someSmallDF1, someSmallDF2)
val joined = smallDFs.foldLeft(bigDF)((acc, df) => acc.join(broadcast(df), "join_column"))
broadcast
is a function to add Broadcast Hint to small DF, so that small DF will use more efficient Broadcast Join instead of Sort Merge Join
Upvotes: 2
Reputation: 27373
1) No, you can only use plain scala code within UDFs
2) If you interpreted your code correctly, you can achieve your goal with:
df2
.join(
df1.select($"ID",y_list.foldLeft(lit(0))(_ + _).as("Result")),Seq("ID")
)
Upvotes: 0