Ahmad Senousi
Ahmad Senousi

Reputation: 633

PySpark: Replace null values that bounded by same values

Q: IS there any way to replace null values in specified columns by values in other rows from same columns.

I want to replace null value that lies between two row that have smae value in the specified columns

               Orignal DF                                          Desired DF
+-----+----+-----+-----+----+----------+            +-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|            |namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+            +-----+----+-----+-----+----+----------+
|Ahmad|  48| null| null|null|      null|            |Ahmad|  48| null| null|null|      null|
|Ahmad|  49| null| null|null|      null|            |Ahmad|  49| null| null|null|      null|
|Ahmad|  50|Ahmad|   50|  54|         3|            |Ahmad|  50|Ahmad|   50|  54|         3|
|Ahmad|  51| null| null|null|      null|            |Ahmad|  51 Ahmad|   50|  54|         3|
|Ahmad|  53| null| null|null|      null|            |Ahmad|  53|Ahmad|   50|  54|         3|
|Ahmad|  54|Ahmad|   50|  54|         3|  >>>>>>>>  |Ahmad|  54|Ahmad|   50|  54|         3|
|Ahmad|  88| null| null|null|      null|  >>>>>>>>  |Ahmad|  88| null| null|null|      null|
|Ahmad|  90|Ahmad|  100|  90|         2|            |Ahmad|  90|Ahmad|  100|  90|         2|
|Ahmad|  95| null| null|null|      null|            |Ahmad|  95|Ahmad|  100|  90|         2|
|Ahmad| 100|Ahmad|  100|  90|         2|            |Ahmad| 100|Ahmad|  100|  90|         2|
|Ahmad| 101| null| null|null|      null|            |Ahmad| 101| null| null|null|      null|
| Emma|  52| Emma|   52|  85|         1|            | Emma|  52| Emma|   52|  85|         1|
| Emma|  85| Emma|   52|  85|         1|            | Emma|  85| Emma|   52|  85|         1|
+-----+----+-----+-----+----+----------+            +-----+----+-----+-----+----+----------+

I tried to replace the null value by using the following steps :

DF7=Orignal_DF.withColumn("name1", fn.last('name1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize,0)))
DF7=DF7.withColumn("math1", fn.last('math1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0)))
DF7=DF7.withColumn("phy1", fn.last('phy1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0)))
DF7=DF7.withColumn("Prev_Rank1", fn.last('Prev_Rank1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0)))

The Resulting Df is :

+-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+
|Ahmad|  48| null| null|null|      null|
|Ahmad|  49| null| null|null|      null|
|Ahmad|  50|Ahmad|   50|  54|         3|
|Ahmad|  51|Ahmad|   50|  54|         3|
|Ahmad|  53|Ahmad|   50|  54|         3|
|Ahmad|  54|Ahmad|   50|  54|         3|
|Ahmad|  88|Ahmad|   50|  54|         3|
|Ahmad|  90|Ahmad|  100|  90|         2|
|Ahmad|  95|Ahmad|  100|  90|         2|
|Ahmad| 100|Ahmad|  100|  90|         2|
|Ahmad| 101|Ahmad|  100|  90|         2|
| Emma|  52| Emma|   52|  85|         1|
| Emma|  85| Emma|   52|  85|         1|
+-----+----+-----+-----+----+----------+

Upvotes: 2

Views: 333

Answers (1)

Grant Shannon
Grant Shannon

Reputation: 5055

here is a possible solution:

The approach taken was to convert each column of the data frame into a list. Then for each list, smooth out those null values that are bounded by repeat values. Then recombine the lists into a result list which is then converted back into a data frame.

set up a test data set as follows:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('testSmothing').getOrCreate()
df=spark.createDataFrame(data=[(None,'A',3)\
                               ,(None,'B',None)\
                               ,(3,None,None)\
                               ,(None,'C',-4)\
                               ,(None,'D',-2)\
                               ,(3,None,None)\
                               ,(4,None,None)\
                               ,(5,'G',-2)\
                               ,(6,'H',-1)\
                               ,(None,'I',-1)\
                               ,(None,None,-1)\
                               ,(8,'I',-1)\
                               ,(9,'J',-1)]\
                               ,schema=['x1','x2','x3'])

df.show()

+----+----+----+
|  x1|  x2|  x3|
+----+----+----+
|null|   A|   3|
|null|   B|null|
|   3|null|null|
|null|   C|  -4|
|null|   D|  -2|
|   3|null|null|
|   4|null|null|
|   5|   G|  -2|
|   6|   H|  -1|
|null|   I|  -1|
|null|null|  -1|
|   8|   I|  -1|
|   9|   J|  -1|
+----+----+----+

helper function 1:

in the current sub-list - check to see if the nulls are bounded by the same value:

def isRepeatBound(tempList):
    count=0
    startElt=tempList[0]
    for elt in tempList:
        if count < len(tempList)-1:
            count=count+1
            if(elt == None and tempList[count]== startElt):
                return True
    return False

helper function 2:

smooth out the current list:

def smoothLst(lst):
valueFound =False
colLst=[]
index=0
smooth=False
for elt in lst:
    if (index ==0):
        if(elt is None and valueFound == False):
            colLst.append(elt)
        elif(elt is None and valueFound == True):
            if(smooth == True):
                colLst.append(lastValue)
            else:
                colLst.append(elt)
        elif(index < len(lst)-1 and isRepeatBound(lst[index:])==True):
            smooth=True
            lastValue=elt
            valueFound=True
            colLst.append(elt)
        else:
            smooth=False
            valueFound=False
            colLst.append(elt)
    else:
        if(elt is None and valueFound == False):
            colLst.append(elt)
        elif(elt is None and valueFound == True):
            if(smooth == True):
                colLst.append(lastValue)
            else:
                colLst.append(elt)
        elif(index < len(lst)-1 and isRepeatBound(lst[index:])==True):
            smooth=True
            lastValue=elt
            valueFound=True
            colLst.append(elt)
        else:
            smooth=False
            valueFound=False
            colLst.append(elt)
    index = index +1

return colLst

main()

Iterate over all columns. For each column convert it into a list. Smooth the list using the helper functions above. Store smoothed list in a master list that is transposed and converted back into the final result DF.

colNames = df.schema.names
resultlst=[]
for name in colNames:
    tmpDict={}
    lst = df.select(df[name]).rdd.flatMap(lambda x: x).collect()
    smoothList = smoothLst(lst)
    resultlst.append(smoothList)

transposeResultLst=list(map(list, zip(*resultlst)))
resultDF = spark.sparkContext.parallelize(transposeResultLst).toDF(['x1','x2','x3'])

resultDF.show()

+----+----+----+
|  x1|  x2|  x3|
+----+----+----+
|null|   A|   3|
|null|   B|null|
|   3|null|null|
|   3|   C|  -4|
|   3|   D|  -2|
|   3|null|  -2|
|   4|null|  -2|
|   5|   G|  -2|
|   6|   H|  -1|
|   6|   I|  -1|
|   6|   I|  -1|
|   8|   I|  -1|
|   9|   J|  -1|
+----+----+----+

Upvotes: 2

Related Questions