samba
samba

Reputation: 3111

Pyspark - how to drop records by primary keys?

I want to delete the records from an old_df if a new_df has a del flag for the key metric_id. What is the right way to achieve this?

old_df (flag here is filled with nulls on purpose)

+---------+--------+-------------+
|metric_id| flag   |        value|
+---------+--------+-------------+
|       10|    null|       value2|
|       10|    null|       value9|
|       12|    null|updated_value|
|       15|    null|  test_value2|
+---------+--------+-------------+

new_df

+---------+--------+-------------+
|metric_id| flag   |        value|
+---------+--------+-------------+
|       10|     del|       value2|
|       12|    pass|updated_value|
|       15|     del|  test_value2|
+---------+--------+-------------+

result_df

+---------+--------+-------------+
|metric_id| flag   |        value|
+---------+--------+-------------+
|       12|    pass|updated_value|
+---------+--------+-------------+

Upvotes: 0

Views: 231

Answers (1)

ernest_k
ernest_k

Reputation: 45329

One easy way to do this is to join then filter:

result_df = (
      old_df.join(new_df, on='metric_id', how='left')
            .where((new_df['flag'].isNull()) | (new_df['flag'] != lit('del')))
            .select('metric_id', new_df['flag'], new_df['value'])
)

Which produces

+---------+----+-------------+
|metric_id|flag|        value|
+---------+----+-------------+
|       12|pass|updated_value|
+---------+----+-------------+

I'm using a left join because there might be records in old_df for which the primary key is not present in new_df (and you don't want to delete those).

Upvotes: 1

Related Questions