belka
belka

Reputation: 1530

Spark UDF not working: how to specify the column on which to apply it?

Let's say I have my DataFrame, with a given column named "X". I want to understand why the first code doesn't work whereas the second one does. For me, it doesn't change anything.

On the one hand, this doesn't work:

val dataDF = sqlContext
      .read
      .parquet(input_data)
      .select(
          "XXX", "YYY", "III"
      )
      .toDF(
          "X", "Y", "I"
      )
      .groupBy(
          "X", "Y"
      )
      .agg(
          sum("I").as("sum_I")
      )
      .orderBy(desc("sum_I"))
      .withColumn("f_sum_I", udf((x: Long) => f(x)).apply(dataDF("sum_I")))
      .drop("sum_I")

dataDF.show(50, false)

IntelliJ doesn't compile my code and I have the following error:

Error:(88, 67) recursive value dataDF needs type
      .withColumn("f_sum_I", udf((x: Long) => f(x)).apply(dataDF("sum_I")))

On the other hand, this work if I change the given line with this:

.withColumn("f_sum_I", udf((x: Long) => f(x)).apply(col("sum_I")))

All I did was replacing the call to my DataFrame column to use a more generic function "col". I don't understand the difference, and more especially why does it not prefer the first method (with the name of the DataFrame).

Upvotes: 0

Views: 1951

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37832

You're trying to use dataDF before you're done defining it - dataDF is the result of the entire expression starting with sqlContext.read and ending with .drop("sumI"), so you can't use it within that expression.

You can solve this by simply referencing the column without using the DataFrame, e.g. using the col function from org.apache.spark.sql.functions:

.withColumn("f_sum_I", udf((x: Long) => f(x)).apply(col("sum_I")))

Upvotes: 2

Related Questions