Reputation: 1330
I need to remove rows where for the same id, p_id and key_id, feedback is missing but we do have some of the feedback present.
input
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p1 k2
1 p2 k3
2 p1 k3 sad
output
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p2 k3
2 p1 k3 sad
How can I achieve that in pyspark?
Upvotes: 0
Views: 1260
Reputation: 901
You can add a column (let's call it num_feedbacks) for each key ([id, p_id, key_id]) that counts how many feedback for that key you have in the DataFrame. Then you can filter your DataFrame keeping only the rows where you have a feedback (feedback is not Null) or you do not have any feedback for that specific key.
Here is the code example:
key = ['id', 'p_id', 'key_id']
num_feedbacks = df.filter(col('feedback')!="")\
.groupby(key).agg(F.count('feedback').alias('num_feedbacks'))
df = df.join(num_feedbacks, on=key, how='left')\
.filter((col('feedback')!="") | (col('num_feedbacks').isNull()))\
.drop('num_feedbacks')
Which gives you:
+---+----+------+--------+
| id|p_id|key_id|feedback|
+---+----+------+--------+
| 2| p1| k3| sad|
| 1| p1| k1| sad|
| 1| p1| k1| happy|
| 1| p1| k2| sad|
| 1| p2| k3| |
+---+----+------+--------+
Upvotes: 1
Reputation: 42352
I'd make a new column called min_length
and filter by that column and the feedback
column:
import pyspark.sql.functions as F
import pyspark.sql.window.Window as W
df = df.withColumn('min_length',
F.min(F.length(F.trim(F.col('feedback'))))
.over(W.partitionBy('id', 'p_id', 'key_id'))
)
cond = (F.col('min_length') != 0) & (F.length(F.trim(F.col('feedback'))) == 0)
df.filter(~cond)
The trims are just stripping off any spaces in the feedback
column
Upvotes: 2