Reputation: 617
The below code failing to capture the 'null' value records. From below df1, the column NO . 5 has a null value (name field).
As per my below requirement OutputDF, the No. 5 record should come as mentioned. But after below code execution this record is not coming into the final output. The records with 'null' values are not coming into the output. Except this, remaining everything fine.
df1
NO DEPT NAME SAL
1 IT RAM 1000
2 IT SRI 600
3 HR GOPI 1500
5 HW 700
df2
NO DEPT NAME SAL
1 IT RAM 1000
2 IT SRI 900
4 MT SUMP 1200
5 HW MAHI 700
OutputDF
NO DEPT NAME SAL FLAG
1 IT RAM 1000 SAME
2 IT SRI 900 UPDATE
4 MT SUMP 1200 INSERT
3 HR GOPI 1500 DELETE
5 HW MAHI 700 UPDATE
from pyspark.shell import spark
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
sc = spark.sparkContext
filedf1 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file1.csv")
filedf2 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file2.csv")
filedf1.createOrReplaceTempView("table1")
filedf2.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )
#DELETE
df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('DELETE').alias('FLAG'))
print("df_d left:",df_d.show())
#INSERT
df_i = df1.join(df2, df1.NO == df2.NO, 'right').filter(F.isnull(df1.NO)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('INSERT').alias('FLAG'))
print("df_i right:",df_i.show())
#SAME
df_s = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) == F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('SAME').alias('FLAG'))
print("df_s inner:",df_s.show())
#UPDATE
df_u = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) != F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('UPDATE').alias('FLAG'))
print("df_u inner:",df_u.show())
df = df_d.union(df_i).union(df_s).union(df_u)
df.show()
Here i'm comparing both df1 and df2, if found new records in df2 taking flag as INSERT, if record is same in both dfs then taking as SAME, if the record is in df1 and not in df2 taking as DELETE and if the record exist in both dfs but with different values then taking df2 values as UPDATE.
Upvotes: 0
Views: 1307
Reputation: 1338
There's two issues with the code:
The result of F.concat of a null returns null, so this part in code filters out row row NO 5:
.filter(F.concat(df2.NO, df2.NAME, df2.SAL) != F.concat(df1.NO, df1.NAME, df1.SAL))
You are only selecting df2. It's fine in the example case above, but if your df2 has a null then the resultant dataframe will have null.
You can try concatenating it with a udf below:
def concat_cols(row):
concat_row = ''.join([str(col) for col in row if col is not None])
return concat_row
udf_concat_cols = udf(concat_cols, StringType())
The function concat_row
can be broken down into two parts:
[str(col) for col in row if col is not None] is a list comprehension, it does as it reads: for each column in the row, if
the column is not None, then append the str(col) into the list.
List comprehension is just a more pythonic way of doing this:
mylist = []
for col in row:
if col is not None:
mylist.append(col))
You can replace your update code as:
df_u = (df1
.join(df2, df1.NO == df2.NO, 'inner')
.filter(udf_concat_cols(struct(df1.NO, df1.NAME, df1.SAL)) != udf_concat_cols(struct(df2.NO, df2.NAME, df2.SAL)))
.select(coalesce(df1.NO, df2.NO),
coalesce(df1.NAME, df2.NAME),
coalesce(df1.SAL, df2.SAL),
F.lit('UPDATE').alias('FLAG')))
You should do something similar for your #SAME flag and break the line for readability.
Update:
If df2 always have the correct (updated) result, there is no need to coalesce. The code for this instance would be:
df_u = (df1
.join(df2, df1.NO == df2.NO, 'inner')
.filter(udf_concat_cols(struct(df1.NO, df1.NAME, df1.SAL)) != udf_concat_cols(struct(df2.NO, df2.NAME, df2.SAL)))
.select(df2.NO,
df2.NAME,
df2.SAL,
F.lit('UPDATE').alias('FLAG')))
Upvotes: 1