Sanjay
Sanjay

Reputation: 45

how to call an udf with multiple arguments(currying) in spark sql?

How do i call the below UDF with multiple arguments(currying) in a spark dataframe as below.

read read and get a list[String]

val data = sc.textFile("file.csv").flatMap(line => line.split("\n")).collect.toList

register udf

val getValue = udf(Udfnc.getVal(_: Int, _: String, _: String)(_: List[String]))

call udf in the below df

df.withColumn("value",
     getValue(df("id"),
        df("string1"),
        df("string2"))).show()

Here is am missing the List[String] argument, and I am really not sure as how should i pass on this argument .

Upvotes: 2

Views: 13949

Answers (2)

swapnil shashank
swapnil shashank

Reputation: 987

Defining a UDF with multiple parameters:

val enrichUDF: UserDefinedFunction = udf((jsonData: String, id: Long) => {

      val lastOccurence = jsonData.lastIndexOf('}')
      val sid = ",\"site_refresh_stats_id\":" + id+ " }]"
      val enrichedJson = jsonData.patch(lastOccurence, sid, sid.length)

      enrichedJson

    })

Calling the udf to an existing dataframe:

val enrichedDF = EXISTING_DF
  .withColumn("enriched_column",
    enrichUDF(col("jsonData")
      , col("id")))

An import statement is also required as:

import org.apache.spark.sql.expressions.UserDefinedFunction

Upvotes: 1

m-bhole
m-bhole

Reputation: 1189

I can make following assumption about your requirement based on your question

a] UDF should accept parameter other than dataframe column

b] UDF should take multiple columns as parameter

Let's say you want to concat values from all column along with specified parameter. Here is how you can do it

import org.apache.spark.sql.functions._

def uDF(strList: List[String]) = udf[String, Int, String, String]((value1: Int, value2: String, value3: String) => value1.toString + "_" + value2 + "_" + value3 + "_" + strList.mkString("_"))

val df = spark.sparkContext.parallelize(Seq((1,"r1c1","r1c2"),(2,"r2c1","r2c2"))).toDF("id","str1","str2")

scala> df.show
+---+----+----+
| id|str1|str2|
+---+----+----+
|  1|r1c1|r1c2|
|  2|r2c1|r2c2|
+---+----+----+

val dummyList = List("dummy1","dummy2")
val result = df.withColumn("new_col", uDF(dummyList)(df("id"),df("str1"),df("str2")))



   scala> result.show(2, false)
+---+----+----+-------------------------+
|id |str1|str2|new_col                  |
+---+----+----+-------------------------+
|1  |r1c1|r1c2|1_r1c1_r1c2_dummy1_dummy2|
|2  |r2c1|r2c2|2_r2c1_r2c2_dummy1_dummy2|
+---+----+----+-------------------------+

Upvotes: 5

Related Questions