Mahesh Manjunath
Mahesh Manjunath

Reputation: 91

Pyspark : Deleting/Removing rows with conditions

I have a dataframe which looks like this

df = spark.createDataFrame([("1", "INSERT"),("2", "INSERT"),("3", "INSERT"), ("2", "MODIFY"), ("1", "DELETE"), ("3", "DELETE"), ("4", "INSERT")], ["id", "label"])
df.show()
+---+------+
| id| label|
+---+------+
|  1|INSERT|
|  2|INSERT|
|  3|INSERT|
|  2|MODIFY|
|  1|DELETE|
|  3|DELETE|
|  4|INSERT|
+---+------+

I'm trying to remove rows based on a condition that , those rows which have label as 'DELETE' which also has a 'INSERT' label with same id has to be removed

so the result would be something like:

+---+------+
| id| label|
+---+------+
|   |      |
|  2|INSERT|
|  2|MODIFY|      
|  4|INSERT|

+---+------+

tried used something like filter in pyspark

df.filter("label !='DELETE' AND id !='1'").show()

but not sure how to put more conditions. Do not want to use pandas.

Upvotes: 1

Views: 992

Answers (1)

Guoran Yun
Guoran Yun

Reputation: 339

try this:

from pyspark.sql import functions as F

df = spark.createDataFrame([("1", "INSERT"),("2", "INSERT"),("3", "INSERT"), ("2", "MODIFY"), ("1", "DELETE"), ("3", "DELETE"), ("4", "INSERT")], ["id", "label"])

df.groupBy(df.id).agg(F.collect_list(df.label).alias('labels')) \
    .where((~ F.array_contains(F.col('labels'), 'DELETE'))) \
    .select(df.id, F.explode(F.col('labels')).alias('label')) \
    .show()

output:

+---+------+
| id| label|
+---+------+
|  4|INSERT|
|  2|MODIFY|
|  2|INSERT|
+---+------+
  • Group by id and collect_list(label);
  • Filter out rows containing DELETE;
  • Restore to the original row with explode().

It should be noted that this does not preserve the original row order, nor does it save rows with null label.

If you have other columns, You can try the following code. There is no aggregation on the original DF, so other normal columns will not be affected.

from pyspark.sql import functions as F

df = spark.createDataFrame([("5", None, 'columnTest'), ("1", "INSERT", 'columnTest'), ("2", "INSERT", 'columnTest'),("3", "INSERT", 'columnTest'), ("2", "MODIFY", 'columnTest'), ("1", "DELETE", 'columnTest'), ("3", "DELETE", 'columnTest'), ("4", "INSERT", 'columnTest')], ["id", "label", 'columnTest'])

ids = df.where(df.label == 'DELETE') \
    .groupBy(df.label).agg(F.collect_set(df.id).alias('ids')).collect()[0]['ids']

df.where(~ df.id.isin(ids)).show()

output:

+---+------+----------+
| id| label|columnTest|
+---+------+----------+
|  5|  null|columnTest|
|  2|INSERT|columnTest|
|  2|MODIFY|columnTest|
|  4|INSERT|columnTest|
+---+------+----------+
  • Get all id of DELETE;
  • Just filter out rows id in ids.

This preserves the original row order and the label field with null value.

Upvotes: 2

Related Questions