jdk2588
jdk2588

Reputation: 792

Using Spark filter a data frame with conditions

I have a data frame which looks like

scala> val df = sc.parallelize(Seq(("User 1","X"), ("User 2", "Y"), ("User 3", "X"), ("User 2", "E"), ("User 3", "E"))).toDF("user", "event")

scala> df.show
+------+-----+
|  user|event|
+------+-----+
|User 1|    X|
|User 2|    Y|
|User 3|    X|
|User 2|    E|
|User 3|    E|
+------+-----+

I want to find all the users who has event "X" but don't have event "E"

In this case only 'User 1' qualifies as it does not have an event "E" entry. How can I do it using Spark API?

Upvotes: 3

Views: 2460

Answers (4)

FaigB
FaigB

Reputation: 2281

You can group users with collection of events and then filter out events for appropriate user based on specific condition.

val result = df.groupBy("user")
    .agg(collect_list("event")
    .as("events"))
    .filter( p => p.getList(1).contains("X") && !p.getList(1).contains("E"))

Upvotes: 4

pasha701
pasha701

Reputation: 7207

Left join can be used:

val xDF = df.filter(col("event") === "X")
val eDF = df.filter(col("event") === "E")
val result = xDF.as("x").join(eDF.as("e"), List("user"), "left_outer").where(col("e.event").isNull).select(col("x.user"))

Result is:

+------+
|user  |
+------+
|User 1|
+------+

Upvotes: 4

shengshan zhang
shengshan zhang

Reputation: 538

val tmp = df.groupBy("user").pivot("event").count
tmp.show
+------+----+----+----+
|  user|   E|   X|   Y|
+------+----+----+----+
|User 2|   1|null|   1|
|User 3|   1|   1|null|
|User 1|null|   1|null|
+------+----+----+----+
tmp.filter(  ($"X" isNotNull) and ($"E" isNull) ).show
+------+----+---+----+
|  user|   E|  X|   Y|
+------+----+---+----+
|User 1|null|  1|null|
+------+----+---+----+
tmp.filter(  ($"X" isNotNull) and ($"E" isNull) ).select("user","X").show 
+------+---+
|  user|  X|
+------+---+
|User 1|  1|
+------+---+

And hope this will help

Upvotes: 3

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can count rows of each users and count each rows of users and events and the filter those rows whose both counts are equal and event column has X value.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.withColumn("count", count($"user").over(Window.partitionBy("user")))
    .withColumn("distinctCount", count($"user").over(Window.partitionBy("user", "event")))
    .filter($"count" === $"distinctCount" && $"event" === "X")
    .drop("count", "distinctCount")

You should get the result you want

I hope the answer is helpful

Upvotes: 1

Related Questions