Amit Nayak
Amit Nayak

Reputation: 621

SparkSQL-Scala Program : First-column should contains lowest string among (col1,col2) of each row

Need Output where first-column should contains lowest string among (column1,column2) of each row.

I am getting error : Exception in thread "main" org.apache.spark.SparkException: Task not serializable

I am trying to use UDF to return minimum and maximum value of a two string, and use it in sql statement.

"select minUDF(name1,name2), maxUDF(name1,name2) from friends"

Don't know where i am doing wrong. Can somebody please help me to find my mistake?

Input :           Required Output :
+-----+-----+     +-----+-----+
|name1|name2|     |name1|name2|
+-----+-----+     +-----+-----+
| shir| amit|     | amit| shir|
| bane| shir|     | bane| shir|
| shir| raj |     |  raj| shir|
| amit| shir|     | amit| shir|
| xiag| alan|     | alan| xiag|
| shir|  raj|     |  raj| shir|
+-----+-----+     +-----+-----+

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object test {

  def main(args: Array[String]): Unit = {


    val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate();
    val sc=spark.sparkContext

    case class friends(name1:String,name2:String)
    val rdd=sc.parallelize(Seq(Row("shir","amit"),Row("amit","shir"),Row("raj","shir"),Row("amit","shir"),Row("raj","shir"),Row("shir","raj")))

    val schema=StructType(Array(
      StructField("name1",StringType,true),
      StructField("name2",StringType,true)
    ))

    val df=spark.createDataFrame(rdd, schema)
    df.show()

    val minfun = (str1:String,str2:String)=>{

      if(str1.compareTo(str2)<0)
        return str1
      else
        return str2

    }

    val maxfun = (str1:String,str2:String)=>{

      if(str1.compareTo(str2)>0)
        return str1
      else
        return str2

    }

    spark.udf.register("minUDF",minfun)
    spark.udf.register("maxUDF",maxfun)

    df.createOrReplaceTempView("friends")
    val ddd = spark.sql("select minUDF(name1,name2), maxUDF(name1,name2) from friends")
    ddd.show

    spark.stop()

  }
}```

Upvotes: 0

Views: 200

Answers (1)

code.gsoni
code.gsoni

Reputation: 695

Please do not write UDF when we have built-in functions for the same:

Try to use SQL least and greatest function to calculate min and max:

import org.apache.spark.sql.functions.least

import org.apache.spark.sql.functions.greatest

val ddd = spark.sql("select least(name1,name2), greatest(name1,name2) from friends")

ddd.show

Also try to avoid using UDF because they need to serialize among all the worker nodes which will incurred you the cost of Serialization and Deserialization.

Upvotes: 2

Related Questions