Gourav
Gourav

Reputation: 1265

Spark Sql udf with variable number of parameters

I want a concat function for Spark Sql. I have written a udf as

sqlContext.udf.register("CONCAT",(args:String*)=>{
 String out=""
 for(arg<-args)
  {
    out+=arg
  }
 out
})

sqlContext.sql("select col1,col2,CONCAT(col1,col2) from testtable")

but this udf is not working and I am getting an exception. If I try with fixed number of parameters then it works. I am using spark 1.3.1 and scala 2.10.5.

Has anyone faced this issue or knows a solution for this?

Upvotes: 2

Views: 4830

Answers (2)

zero323
zero323

Reputation: 330083

If all you want is to concatenate columns using raw SQL there is no need for a custom UDF at all. CONCAT function is already there:

val df = sc.parallelize(List(("a", "b", "c"))).toDF("x", "y", "z")
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(x, y, z) AS xyz FROM df").show

// +---+
// |xyz|
// +---+
// |abc|
// +---+

Since 1.5.0 you can use concat / concat_ws functions directly:

import org.apache.spark.sql.functions.{concat, concat_ws}

df.select(concat_ws("-", $"x", $"y", $"z").alias("x-y-z")).show
// +-----+
// |x-y-z|
// +-----+
// |a-b-c|
// +-----+

df.select(concat($"x", $"y", $"z").alias("xyz")).show

// +---+
// |xyz|
// +---+
// |abc|
// +---+

See also Spark UDF with varargs

Upvotes: 2

Zyoma
Zyoma

Reputation: 1578

You can do this using the struct function like following:

val myUDF = udf {
  (r: Row) => r.toSeq.map(...) // the "r" row contains your arguments
}
val df = ....
df.select(col("col1"), myUDF(struct(col("col2"), col("col3"), col("col4"), ...)))

Upvotes: 4

Related Questions