Vineel
Vineel

Reputation: 45

How to fix the below issue while creating a derived column in pyspark?

I am trying to do binning on a particular column in dataframe based on the data given in the dictionary.

Below is the dataframe I used:

df

SNo,Name,CScore
1,x,700
2,y,850
3,z,560
4,a,578
5,b,456
6,c,678

I have created the below function,it is working fine if I use it seperately.


def binning(column,dict):
    finalColumn=[]
    lent = len(column)
    for i in range(lent):
        for j in range(len(list(dict))):
            if( int(column[i]) in range(list(dict)[j][0],list(dict)[j][1])):
                finalColumn.append(dict[list(dict)[j]])
    return finalColumn

I have used the above function in the below statement.

newDf = df.withColumn("binnedColumn",binning(df.select("CScore").rdd.flatMap(lambda x: x).collect(),{(1,400):'Low',(401,900):'High'}))

I am getting the below error:

Traceback (most recent call last): File "", line 1, in File "C:\spark_2.4\python\pyspark\sql\dataframe.py", line 1988, in withColumn assert isinstance(col, Column), "col should be Column" AssertionError: col should be Column

Any help to solve this issue will be of great help.Thanks.

Upvotes: 1

Views: 1342

Answers (1)

E.ZY.
E.ZY.

Reputation: 725

Lets start with creating the data:

rdd = sc.parallelize([[1,"x",700],[2,"y",850],[3,"z",560],[4,"a",578],[5,"b",456],[6,"c",678]])
df = rdd.toDF(["SNo","Name","CScore"])
>>> df.show()
+---+----+------+
|SNo|Name|CScore|
+---+----+------+
|  1|   x|   700|
|  2|   y|   850|
|  3|   z|   560|
|  4|   a|   578|
|  5|   b|   456|
|  6|   c|   678|
+---+----+------+

if your final goal is to provide a binning_dictionary like your example do, to find the corresponding categorical value. udf is the solution.

the following is your normal function.

bin_lookup = {(1,400):'Low',(401,900):'High'}
def binning(value, lookup_dict=bin_lookup):
    for key in lookup_dict.keys():
        if key[0] <= value <= key[1]:
             return lookup_dict[key]

to register as a udf and run it via dataframe:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

binning_udf = udf(binning, StringType())
>>> df.withColumn("binnedColumn", binning_udf("CScore")).show()
+---+----+------+------------+
|SNo|Name|CScore|binnedColumn|
+---+----+------+------------+
|  1|   x|   700|        High|
|  2|   y|   850|        High|
|  3|   z|   560|        High|
|  4|   a|   578|        High|
|  5|   b|   456|        High|
|  6|   c|   678|        High|
+---+----+------+------------+

directly apply to rdd:

>>> rdd.map(lambda row: binning(row[-1], bin_lookup)).collect()
['High', 'High', 'High', 'High', 'High', 'High']

Upvotes: 1

Related Questions