
Reputation: 11

PySpark - Pass list as parameter to UDF + iterative dataframe column addition

I borrowed this example from a link!

I would like to understand why dataframe a - after having had column 'category' seemingly added to it, cannot be referenced in a subsequent operation. Is dataframe a somehow immutable? Is there another way to act on dataframe a so that subsequent operations can access column 'category'? Thanks for your help; I am still on the learning curve. Now, it is possible to add all the columns at once to avoid the error, but that isn't what I want to do here.

#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80),("E",0)],["Letter", "distances"])
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]

#Passing List as Default value to a variable
def cate( feature_list,label=label_list):
    if feature_list == 0:
        return label[4]
        return 'I am not sure!'

def cate2( feature_list,label=label_list):
    if feature_list == 0:
        return label[4]
    elif feature_list.category=='I am not sure!':
        return 'Why not?'

udfcate = udf(cate, StringType())
udfcate2 = udf(cate2, StringType())

a.withColumn("category", udfcate("distances"))
a.withColumn("category2", udfcate2("category")).show()

I get the error:

C:\Users\gowreden\AppData\Local\Continuum\anaconda3\python.exe C:/Users/gowreden/PycharmProjects/DRC/src/
2018-08-09 09:06:42 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
|Letter|distances|      category|
|     A|       20|I am not sure!|
|     B|       30|I am not sure!|
|     D|       80|I am not sure!|
|     E|        0|          Dead|

Traceback (most recent call last):
  File "C:\Programs\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\", line 63, in deco
    return f(*a, **kw)
  File "C:\Programs\spark-2.3.1-bin-hadoop2.7\python\lib\\py4j\", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.withColumn.
: org.apache.spark.sql.AnalysisException: cannot resolve '`category`' given input columns: [Letter, distances];;
'Project [Letter#0, distances#1L, cate('category) AS category2#20]
+- AnalysisBarrier
      +- LogicalRDD [Letter#0, distances#1L], false

Upvotes: 0

Views: 1248

Answers (1)

Ala Tarighati
Ala Tarighati

Reputation: 3817

I think there are two issues with your code:

  • First of all, as @pault said, withColumn is not in-place, and you need to modify your code accordingly.
  • Second, your cate2 function is not correct. In the sense that you apply it to column category and at the same time you request for comparing feature_list.category with something.

You may want to get rid of the first function and do the following:

import pyspark.sql.functions as F

a=a.withColumn('category', F.when(a.distances==0, label_list[4]).otherwise('I am not sure!'))


|Letter|distances|      category|
|     A|       20|I am not sure!|
|     B|       30|I am not sure!|
|     D|       80|I am not sure!|
|     E|        0|          Dead|

And do something like this for the second function:

a=a.withColumn('category2', F.when(a.distances==0, label_list[4]).otherwise(F.when(a.category=='I am not sure!', 'Why not?')))


|Letter|distances|      category|category2|
|     A|       20|I am not sure!| Why not?|
|     B|       30|I am not sure!| Why not?|
|     D|       80|I am not sure!| Why not?|
|     E|        0|          Dead|     Dead|

Upvotes: 1

Related Questions