Just_a_Rookie
Just_a_Rookie

Reputation: 11

Filtering inactivated rows in Spark using Scala

I am very new to Spark and Scala programming and I have a problem that I hope some smart people can help me to solve. I have a table named users with 4 columns: status, user_id, name, date

Rows are:

status  user_id name    date
active      1   Peter   2020-01-01
active      2   John    2020-01-01
active      3   Alex    2020-01-01
inactive    1   Peter   2020-02-01
inactive    2   John    2020-01-01

I need to select only active users. Two users were inactivated. Only one was inactivated for the same date.

What I aim is to filter rows with inactive status(this I can) and to filter inactivated users when inactivation row matches columns with active row. Peter was inactivated for different date and he is not filtered. Desired result would be:

1 Peter 2020-01-01
3 Alex 2020-01-01

rows with inactive status filtered. John is inactivated, so his row is filtered too.

The closest I come is to filter users that has inactive status:

val users = spark.table("db.users")
      .filter(col("status").not Equal("Inactive"))
      .select("user_id", "name", "date")

Any ideas or suggestions how to solve this? Thanks!

Upvotes: 1

Views: 55

Answers (1)

Lamanus
Lamanus

Reputation: 13551

Check the inactive first with group by for each user and date, and join this result into the original df.

val df2 = df.groupBy('user_id, 'date).agg(max('status).as("status"))
  .filter("status = 'inactive'")
  .withColumnRenamed("status", "inactive")

df.join(df2, Seq("user_id", "date"), "left")
  .filter('inactive.isNull)
  .select(df.columns.head, df.columns.tail: _*)
  .show()

+------+-------+-----+----------+
|status|user_id| name|      date|
+------+-------+-----+----------+
|active|      1|Peter|2020-01-01|
|active|      3| Alex|2020-01-01|
+------+-------+-----+----------+

Upvotes: 1

Related Questions