Reputation: 152
In spark python I have to merge two dataframes having same column name and type: When merging dataframes few conditions are present:
If UPDATE_TYPE
is update ignore all values from that row other than DISPOSITION
and TIME
.
For disposition use value of disposition from that row (update type is update).
For Time use greater value of time from both rows
If UPDATE_TYPE
is not UPDATE
keep all values from table two other than time. For time use greater value of time.
I have done it but using reducebykey. It is a very slow solution. Can I use df directly and do it?
df1
ID UPDATE_TYPE TIME DISPOSITION ROG
1 SEGMENT 1000 null Q
2 SEGMENT 1001 value W
3 SEGMENT 1002 null E
3 UPDATE 1004 some_value A
4 SEGMENT 1003 null R
5 SEGMENT 1004 value_old T
7 SEGMENT 1050 value U
df2
ID UPDATE_TYPE TIME DISPOSITION ROG
4 SEGMENT 1003 value P1
5 UPDATE 1015 value_new P2
6 SEGMENT 1010 value P3
Final output
df_output
ID UPDATE_TYPE TIME DISPOSITION ROG
1 SEGMENT 1000 null Q
2 SEGMENT 1001 value W
3 SEGMENT 1004 some_value E
4 SEGMENT 1003 null P1
5 SEGMENT 1015 value_new T
6 SEGMENT 1010 value P3
7 SEGMENT 1050 value U
EDIT It turns out duplicate ids can exist in table one itself. These duplicate ID's must not appear in final output
Upvotes: 0
Views: 67
Reputation: 2451
you could try spark sql:
SELECT
DF1.ID,
DF1.UPDATE_TYPE,
CASE WHEN DF1.TIME > DF2.TIME THEN DF1.TIME ELSE DF2.TIME END AS TIME,
CASE WHEN DF2.UPDATE_TYPE='SEGMENT' THEN DF1.DISPOSITION ELSE DF2.DISPOSITION END AS DISPOSITION,
CASE WHEN DF2.UPDATE_TYPE='SEGMENT' THEN DF2.ROG ELSE DF1.ROG END AS ROG
FROM
DF1 LEFT JOIN DF2 ON DF1.ID = DF2.ID
Upvotes: 1