Reputation: 11
I am very new to Spark and Scala programming and I have a problem that I hope some smart people can help me to solve. I have a table named users with 4 columns: status, user_id, name, date
Rows are:
status user_id name date
active 1 Peter 2020-01-01
active 2 John 2020-01-01
active 3 Alex 2020-01-01
inactive 1 Peter 2020-02-01
inactive 2 John 2020-01-01
I need to select only active users. Two users were inactivated. Only one was inactivated for the same date.
What I aim is to filter rows with inactive status(this I can) and to filter inactivated users when inactivation row matches columns with active row. Peter was inactivated for different date and he is not filtered. Desired result would be:
1 Peter 2020-01-01
3 Alex 2020-01-01
rows with inactive status filtered. John is inactivated, so his row is filtered too.
The closest I come is to filter users that has inactive status:
val users = spark.table("db.users")
.filter(col("status").not Equal("Inactive"))
.select("user_id", "name", "date")
Any ideas or suggestions how to solve this? Thanks!
Upvotes: 1
Views: 55
Reputation: 13551
Check the inactive first with group by for each user and date, and join this result into the original df.
val df2 = df.groupBy('user_id, 'date).agg(max('status).as("status"))
.filter("status = 'inactive'")
.withColumnRenamed("status", "inactive")
df.join(df2, Seq("user_id", "date"), "left")
.filter('inactive.isNull)
.select(df.columns.head, df.columns.tail: _*)
.show()
+------+-------+-----+----------+
|status|user_id| name| date|
+------+-------+-----+----------+
|active| 1|Peter|2020-01-01|
|active| 3| Alex|2020-01-01|
+------+-------+-----+----------+
Upvotes: 1