Reputation: 593
I have a pyspark datframe df
of records, each record has id
and group
, and marks whether two events (event1, event2
) have occurred. I want to find the number of ids in each group, that:
I am extracting a simple example here:
df:
| id | event1 | event2 | group
| 001 | 1 | 0 | A
| 001 | 1 | 0 | A
| 001 | 1 | 1 | A
| 002 | 0 | 1 | A
| 003 | 1 | 0 | A
| 003 | 1 | 1 | A
| ... | ... | ... | B
...
in the above df
, for group = A
there are 2 ids have event1:(001,003), and 3 ids have event2:(001,002,003). So e.g., the number of ids in event2 but not event1 is 1.
I hope to get something like this.
group | event2_not_1 | event1_and_2 |
A | 1 | 2 |
B | ... | ... |
So far I have tried to collect a set of ids that appeared for each event, then perform set operations separately in new_df
. But I felt this is rather clumsy. e.g.,
df_new = (
df.withColumn('event1_id', when(col('event1') == 1, col('id')))
.withColumn('event2_id', when(col('event2') == 1, col('id')))
.groupby('group').agg(collect_set('event1_id').alias('has_event1'),
collect_set('event2_id').alias('has_event2'))
)
How do I achieve this elegantly in pyspark?
Upvotes: 0
Views: 211
Reputation: 13551
Use groupby
twice.
df.groupBy("group", "id").agg(f.max("event1").alias("event1"), f.max("event2").alias("event2")) \
.groupBy("group").agg(f.sum(f.expr("if(event2 = 1 and event1 = 0, 1, 0)")).alias("event2_not_1"), f.sum(f.expr("if(event1 = 1 and event2 = 1, 1, 0)")).alias("event1_and_2"))
+-----+------------+------------+
|group|event2_not_1|event1_and_2|
+-----+------------+------------+
|A |1 |2 |
+-----+------------+------------+
Upvotes: 0