Reputation: 621
Need Output where first-column should contains lowest string among (column1,column2) of each row.
I am getting error : Exception in thread "main" org.apache.spark.SparkException: Task not serializable
I am trying to use UDF to return minimum and maximum value of a two string, and use it in sql statement.
"select minUDF(name1,name2), maxUDF(name1,name2) from friends"
Don't know where i am doing wrong. Can somebody please help me to find my mistake?
Input : Required Output :
+-----+-----+ +-----+-----+
|name1|name2| |name1|name2|
+-----+-----+ +-----+-----+
| shir| amit| | amit| shir|
| bane| shir| | bane| shir|
| shir| raj | | raj| shir|
| amit| shir| | amit| shir|
| xiag| alan| | alan| xiag|
| shir| raj| | raj| shir|
+-----+-----+ +-----+-----+
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object test {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate();
val sc=spark.sparkContext
case class friends(name1:String,name2:String)
val rdd=sc.parallelize(Seq(Row("shir","amit"),Row("amit","shir"),Row("raj","shir"),Row("amit","shir"),Row("raj","shir"),Row("shir","raj")))
val schema=StructType(Array(
StructField("name1",StringType,true),
StructField("name2",StringType,true)
))
val df=spark.createDataFrame(rdd, schema)
df.show()
val minfun = (str1:String,str2:String)=>{
if(str1.compareTo(str2)<0)
return str1
else
return str2
}
val maxfun = (str1:String,str2:String)=>{
if(str1.compareTo(str2)>0)
return str1
else
return str2
}
spark.udf.register("minUDF",minfun)
spark.udf.register("maxUDF",maxfun)
df.createOrReplaceTempView("friends")
val ddd = spark.sql("select minUDF(name1,name2), maxUDF(name1,name2) from friends")
ddd.show
spark.stop()
}
}```
Upvotes: 0
Views: 200
Reputation: 695
Please do not write UDF when we have built-in functions for the same:
Try to use SQL least and greatest function to calculate min and max:
import org.apache.spark.sql.functions.least
import org.apache.spark.sql.functions.greatest
val ddd = spark.sql("select least(name1,name2), greatest(name1,name2) from friends")
ddd.show
Also try to avoid using UDF because they need to serialize among all the worker nodes which will incurred you the cost of Serialization and Deserialization.
Upvotes: 2