fletchr
fletchr

Reputation: 646

Spark-Scala: Filtering Spark Dataset based on multiple column and conditions

I'm having a difficult time trying to find a good way to filter a spark Dataset. I've described the basic problem below:

  1. For every key check if there is a statusCode === UV.
  2. If there is no UV status code associated with that key ignore that key completely.
    • Please Note: There should only ever be one UV for each key.
  3. If there is then search for the closest OA event that is after the UV timestamp.
    • Please note: There could be multiple OA events after the UV timestamp. I want the one closest to the UV timestamp.
  4. If the only OA event is in the past (i.e. before the UV I still want to keep that record because an expected OA will come in but I want to still capture the row with a status code of OA but replace the value will null

Input

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|OA        |2019-05-24 14:46:00|
|AAAAAABBBBB|VD        |2019-05-31 19:31:00|
|AAAAAABBBBB|VA        |2019-06-26 00:00:00|
|AAAAAABBBBB|E         |2019-06-26 02:00:00|
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
|AAAAAABBBBB|EE        |2019-07-03 01:00:00|
+-----------+----------+-------------------+

Expected Output

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
+-----------+----------+-------------------+

I know I could likely solve the problem by setting up the data like this, but does anyone have a suggestion on how to solve the above filter.

someDS
  .groupBy("key")
  .pivot("statusCode", Seq("UV", "OA"))
  .agg(collect_set($"statusTimestamp"))
  .thenSomeOtherStuff...

Upvotes: 1

Views: 314

Answers (1)

Leo C
Leo C

Reputation: 22449

While the groupBy/pivot approach would group the timestamps nicely, it would require non-trivial steps (most likely a UDF) to perform the necessary filtering followed by re-expansion. Here's a different approach with the following steps:

  1. Filter the dataset for statusCode "UV" or "OA" only
  2. For each row, use Window functions to create a String of statusCode from the previous, current, and next 2 rows
  3. Use Regex pattern matching to identify the wanted rows

Sample code below:

import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

// Sample data:
//   key `A`: requirement #3
//   key `B`: requirement #2
//   key `C`: requirement #4  
val df = Seq(
  ("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
  ("A", "E",  Timestamp.valueOf("2019-05-30 00:00:00")),
  ("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
  ("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
  ("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
  ("B", "C",  Timestamp.valueOf("2019-06-15 00:00:00")),
  ("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
  ("C", "D",  Timestamp.valueOf("2019-06-01 00:00:00")),
  ("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
  ("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
).toDF("key", "statusCode", "statusTimestamp")

val win = Window.partitionBy("key").orderBy("statusTimestamp")

val df2 = df.
  where($"statusCode" === "UV" || $"statusCode" === "OA").
  withColumn("statusPrevCurrNext2", concat(
    coalesce(lag($"statusCode", 1).over(win), lit("")),
    lit("#"),
    $"statusCode",
    lit("#"),
    coalesce(lead($"statusCode", 1).over(win), lit("")),
    lit("#"),
    coalesce(lead($"statusCode", 2).over(win), lit(""))
  ))

Let's look at df2 (result of steps 1 and 2):

df2.show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |B  |OA        |2019-06-25 00:00:00|#OA##              |
// |C  |OA        |2019-06-30 00:00:00|#OA#UV#            | <-- Req #4: Ends with `#UV#`
// |C  |UV        |2019-07-02 00:00:00|OA#UV##            | <-- Req #4: Ends with `#UV##`
// |A  |OA        |2019-05-20 00:00:00|#OA#UV#OA          |
// |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        | <-- Req #3: Starts with `[^#]*#UV#`
// |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          | <-- Req #3: starts with `UV#`
// |A  |OA        |2019-07-03 00:00:00|OA#OA##            |
// +---+----------+-------------------+-------------------+

Applying step 3:

df2.
  where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
  orderBy("key", "statusTimestamp").
  show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        |
// |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          |
// |C  |OA        |2019-06-30 00:00:00|#OA#UV#            |
// |C  |UV        |2019-07-02 00:00:00|OA#UV##            |
// +---+----------+-------------------+-------------------+

Upvotes: 1

Related Questions