Vinitkumar
Vinitkumar

Reputation: 121

Type mismatch in Spark UDF

I have created following UDF to fetch only 1st part of decimal values.

def udf_cleansing(col1 : Double) = udf((col1 : Double) => {
val col2 : String = f"$col1%.5f"
if(col2.trim == "" || col2 == null ) 0.toString else col2.substring(0,col2.indexOf("."))}
)

However, while calling this function using command like

df_aud.select(udf_cleansing(df_aud("HASH_TTL")))

I am getting follwing error :-

<console>:42: error: type mismatch;

found : org.apache.spark.sql.Column

required: Double

df_aud.select(udf_cleansing(df_aud("HASH_TTL")))

I tried with command

df_aud.withColumn("newc",udf_cleansing(df_aud("HASH_TTL").cast("double")))

Still getting same error.

Upvotes: 4

Views: 2718

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

I would recommend you to use spark functions as much as possible. If any of the inbuilt functions cannot satisfy your needs, then only I would suggest you to go with udf functions as udf functions would require the data to be serialized and deserialized to perform the operation you have devised.

Your udf function can be performed by using format_string and substring_index inbuilt functions as below

import org.apache.spark.sql.functions._
df_aud.select(substring_index(format_string("%.5f", df_aud("HASH_TTL")), ".", 1))

Upvotes: 1

T. Gawęda
T. Gawęda

Reputation: 16096

The reason is that Scala treats df_aud("HASH_TTL") as a parameter to udf_cleansing function, not to UDF this function returned.

Instead, you should write:

def udf_cleansing = udf(
    (col1 : Double) => {
        val col2 : String = f"$col1%.5f"
        if(col2.trim == "" || col2 == null ) 0.toString else col2.substring(0,col2.indexOf("."))
    }
)

Now udf_cleansing returns an UDF. UDF1 function as a parameter of type Column and this column's value is provided to wrapped inner function.

And then use is exactly how you tried to use this function.

Upvotes: 3

Related Questions