Reputation: 633
Q: IS there any way to replace null values in specified columns by values in other rows from same columns.
I want to replace null value that lies between two row that have smae value in the specified columns
Orignal DF Desired DF
+-----+----+-----+-----+----+----------+ +-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1| |namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+ +-----+----+-----+-----+----+----------+
|Ahmad| 48| null| null|null| null| |Ahmad| 48| null| null|null| null|
|Ahmad| 49| null| null|null| null| |Ahmad| 49| null| null|null| null|
|Ahmad| 50|Ahmad| 50| 54| 3| |Ahmad| 50|Ahmad| 50| 54| 3|
|Ahmad| 51| null| null|null| null| |Ahmad| 51 Ahmad| 50| 54| 3|
|Ahmad| 53| null| null|null| null| |Ahmad| 53|Ahmad| 50| 54| 3|
|Ahmad| 54|Ahmad| 50| 54| 3| >>>>>>>> |Ahmad| 54|Ahmad| 50| 54| 3|
|Ahmad| 88| null| null|null| null| >>>>>>>> |Ahmad| 88| null| null|null| null|
|Ahmad| 90|Ahmad| 100| 90| 2| |Ahmad| 90|Ahmad| 100| 90| 2|
|Ahmad| 95| null| null|null| null| |Ahmad| 95|Ahmad| 100| 90| 2|
|Ahmad| 100|Ahmad| 100| 90| 2| |Ahmad| 100|Ahmad| 100| 90| 2|
|Ahmad| 101| null| null|null| null| |Ahmad| 101| null| null|null| null|
| Emma| 52| Emma| 52| 85| 1| | Emma| 52| Emma| 52| 85| 1|
| Emma| 85| Emma| 52| 85| 1| | Emma| 85| Emma| 52| 85| 1|
+-----+----+-----+-----+----+----------+ +-----+----+-----+-----+----+----------+
I tried to replace the null value by using the following steps :
DF7=Orignal_DF.withColumn("name1", fn.last('name1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize,0)))
DF7=DF7.withColumn("math1", fn.last('math1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0)))
DF7=DF7.withColumn("phy1", fn.last('phy1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0)))
DF7=DF7.withColumn("Prev_Rank1", fn.last('Prev_Rank1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0)))
The Resulting Df is :
+-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+
|Ahmad| 48| null| null|null| null|
|Ahmad| 49| null| null|null| null|
|Ahmad| 50|Ahmad| 50| 54| 3|
|Ahmad| 51|Ahmad| 50| 54| 3|
|Ahmad| 53|Ahmad| 50| 54| 3|
|Ahmad| 54|Ahmad| 50| 54| 3|
|Ahmad| 88|Ahmad| 50| 54| 3|
|Ahmad| 90|Ahmad| 100| 90| 2|
|Ahmad| 95|Ahmad| 100| 90| 2|
|Ahmad| 100|Ahmad| 100| 90| 2|
|Ahmad| 101|Ahmad| 100| 90| 2|
| Emma| 52| Emma| 52| 85| 1|
| Emma| 85| Emma| 52| 85| 1|
+-----+----+-----+-----+----+----------+
Upvotes: 2
Views: 333
Reputation: 5055
here is a possible solution:
The approach taken was to convert each column of the data frame into a list. Then for each list, smooth out those null values that are bounded by repeat values. Then recombine the lists into a result list which is then converted back into a data frame.
set up a test data set as follows:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testSmothing').getOrCreate()
df=spark.createDataFrame(data=[(None,'A',3)\
,(None,'B',None)\
,(3,None,None)\
,(None,'C',-4)\
,(None,'D',-2)\
,(3,None,None)\
,(4,None,None)\
,(5,'G',-2)\
,(6,'H',-1)\
,(None,'I',-1)\
,(None,None,-1)\
,(8,'I',-1)\
,(9,'J',-1)]\
,schema=['x1','x2','x3'])
df.show()
+----+----+----+
| x1| x2| x3|
+----+----+----+
|null| A| 3|
|null| B|null|
| 3|null|null|
|null| C| -4|
|null| D| -2|
| 3|null|null|
| 4|null|null|
| 5| G| -2|
| 6| H| -1|
|null| I| -1|
|null|null| -1|
| 8| I| -1|
| 9| J| -1|
+----+----+----+
helper function 1:
in the current sub-list - check to see if the nulls are bounded by the same value:
def isRepeatBound(tempList):
count=0
startElt=tempList[0]
for elt in tempList:
if count < len(tempList)-1:
count=count+1
if(elt == None and tempList[count]== startElt):
return True
return False
helper function 2:
smooth out the current list:
def smoothLst(lst):
valueFound =False
colLst=[]
index=0
smooth=False
for elt in lst:
if (index ==0):
if(elt is None and valueFound == False):
colLst.append(elt)
elif(elt is None and valueFound == True):
if(smooth == True):
colLst.append(lastValue)
else:
colLst.append(elt)
elif(index < len(lst)-1 and isRepeatBound(lst[index:])==True):
smooth=True
lastValue=elt
valueFound=True
colLst.append(elt)
else:
smooth=False
valueFound=False
colLst.append(elt)
else:
if(elt is None and valueFound == False):
colLst.append(elt)
elif(elt is None and valueFound == True):
if(smooth == True):
colLst.append(lastValue)
else:
colLst.append(elt)
elif(index < len(lst)-1 and isRepeatBound(lst[index:])==True):
smooth=True
lastValue=elt
valueFound=True
colLst.append(elt)
else:
smooth=False
valueFound=False
colLst.append(elt)
index = index +1
return colLst
main()
Iterate over all columns. For each column convert it into a list. Smooth the list using the helper functions above. Store smoothed list in a master list that is transposed and converted back into the final result DF.
colNames = df.schema.names
resultlst=[]
for name in colNames:
tmpDict={}
lst = df.select(df[name]).rdd.flatMap(lambda x: x).collect()
smoothList = smoothLst(lst)
resultlst.append(smoothList)
transposeResultLst=list(map(list, zip(*resultlst)))
resultDF = spark.sparkContext.parallelize(transposeResultLst).toDF(['x1','x2','x3'])
resultDF.show()
+----+----+----+
| x1| x2| x3|
+----+----+----+
|null| A| 3|
|null| B|null|
| 3|null|null|
| 3| C| -4|
| 3| D| -2|
| 3|null| -2|
| 4|null| -2|
| 5| G| -2|
| 6| H| -1|
| 6| I| -1|
| 6| I| -1|
| 8| I| -1|
| 9| J| -1|
+----+----+----+
Upvotes: 2