Reputation: 792
I have a data frame which looks like
scala> val df = sc.parallelize(Seq(("User 1","X"), ("User 2", "Y"), ("User 3", "X"), ("User 2", "E"), ("User 3", "E"))).toDF("user", "event")
scala> df.show
+------+-----+
| user|event|
+------+-----+
|User 1| X|
|User 2| Y|
|User 3| X|
|User 2| E|
|User 3| E|
+------+-----+
I want to find all the users who has event "X" but don't have event "E"
In this case only 'User 1' qualifies as it does not have an event "E" entry. How can I do it using Spark API?
Upvotes: 3
Views: 2460
Reputation: 2281
You can group users with collection of events and then filter out events for appropriate user based on specific condition.
val result = df.groupBy("user")
.agg(collect_list("event")
.as("events"))
.filter( p => p.getList(1).contains("X") && !p.getList(1).contains("E"))
Upvotes: 4
Reputation: 7207
Left join can be used:
val xDF = df.filter(col("event") === "X")
val eDF = df.filter(col("event") === "E")
val result = xDF.as("x").join(eDF.as("e"), List("user"), "left_outer").where(col("e.event").isNull).select(col("x.user"))
Result is:
+------+
|user |
+------+
|User 1|
+------+
Upvotes: 4
Reputation: 538
val tmp = df.groupBy("user").pivot("event").count
tmp.show
+------+----+----+----+
| user| E| X| Y|
+------+----+----+----+
|User 2| 1|null| 1|
|User 3| 1| 1|null|
|User 1|null| 1|null|
+------+----+----+----+
tmp.filter( ($"X" isNotNull) and ($"E" isNull) ).show
+------+----+---+----+
| user| E| X| Y|
+------+----+---+----+
|User 1|null| 1|null|
+------+----+---+----+
tmp.filter( ($"X" isNotNull) and ($"E" isNull) ).select("user","X").show
+------+---+
| user| X|
+------+---+
|User 1| 1|
+------+---+
And hope this will help
Upvotes: 3
Reputation: 41957
You can count rows of each users and count each rows of users and events and the filter those rows whose both counts are equal and event column has X value.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.withColumn("count", count($"user").over(Window.partitionBy("user")))
.withColumn("distinctCount", count($"user").over(Window.partitionBy("user", "event")))
.filter($"count" === $"distinctCount" && $"event" === "X")
.drop("count", "distinctCount")
You should get the result you want
I hope the answer is helpful
Upvotes: 1