Reputation: 646
I'm having a difficult time trying to find a good way to filter a spark Dataset. I've described the basic problem below:
null
Input
+-----------+----------+-------------------+
|key |statusCode|statusTimestamp |
+-----------+----------+-------------------+
|AAAAAABBBBB|OA |2019-05-24 14:46:00|
|AAAAAABBBBB|VD |2019-05-31 19:31:00|
|AAAAAABBBBB|VA |2019-06-26 00:00:00|
|AAAAAABBBBB|E |2019-06-26 02:00:00|
|AAAAAABBBBB|UV |2019-06-29 00:00:00|
|AAAAAABBBBB|OA |2019-07-01 00:00:00|
|AAAAAABBBBB|EE |2019-07-03 01:00:00|
+-----------+----------+-------------------+
Expected Output
+-----------+----------+-------------------+
|key |statusCode|statusTimestamp |
+-----------+----------+-------------------+
|AAAAAABBBBB|UV |2019-06-29 00:00:00|
|AAAAAABBBBB|OA |2019-07-01 00:00:00|
+-----------+----------+-------------------+
I know I could likely solve the problem by setting up the data like this, but does anyone have a suggestion on how to solve the above filter.
someDS
.groupBy("key")
.pivot("statusCode", Seq("UV", "OA"))
.agg(collect_set($"statusTimestamp"))
.thenSomeOtherStuff...
Upvotes: 1
Views: 314
Reputation: 22449
While the groupBy/pivot
approach would group the timestamps nicely, it would require non-trivial steps (most likely a UDF) to perform the necessary filtering followed by re-expansion. Here's a different approach with the following steps:
statusCode
"UV" or "OA" onlystatusCode
from the previous, current, and next 2 rows
Regex
pattern matching to identify the wanted rowsSample code below:
import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
// Sample data:
// key `A`: requirement #3
// key `B`: requirement #2
// key `C`: requirement #4
val df = Seq(
("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
("A", "E", Timestamp.valueOf("2019-05-30 00:00:00")),
("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
("B", "C", Timestamp.valueOf("2019-06-15 00:00:00")),
("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
("C", "D", Timestamp.valueOf("2019-06-01 00:00:00")),
("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
).toDF("key", "statusCode", "statusTimestamp")
val win = Window.partitionBy("key").orderBy("statusTimestamp")
val df2 = df.
where($"statusCode" === "UV" || $"statusCode" === "OA").
withColumn("statusPrevCurrNext2", concat(
coalesce(lag($"statusCode", 1).over(win), lit("")),
lit("#"),
$"statusCode",
lit("#"),
coalesce(lead($"statusCode", 1).over(win), lit("")),
lit("#"),
coalesce(lead($"statusCode", 2).over(win), lit(""))
))
Let's look at df2
(result of steps 1
and 2
):
df2.show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |B |OA |2019-06-25 00:00:00|#OA## |
// |C |OA |2019-06-30 00:00:00|#OA#UV# | <-- Req #4: Ends with `#UV#`
// |C |UV |2019-07-02 00:00:00|OA#UV## | <-- Req #4: Ends with `#UV##`
// |A |OA |2019-05-20 00:00:00|#OA#UV#OA |
// |A |UV |2019-06-22 00:00:00|OA#UV#OA#OA | <-- Req #3: Starts with `[^#]*#UV#`
// |A |OA |2019-07-01 00:00:00|UV#OA#OA# | <-- Req #3: starts with `UV#`
// |A |OA |2019-07-03 00:00:00|OA#OA## |
// +---+----------+-------------------+-------------------+
Applying step 3
:
df2.
where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
orderBy("key", "statusTimestamp").
show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |A |UV |2019-06-22 00:00:00|OA#UV#OA#OA |
// |A |OA |2019-07-01 00:00:00|UV#OA#OA# |
// |C |OA |2019-06-30 00:00:00|#OA#UV# |
// |C |UV |2019-07-02 00:00:00|OA#UV## |
// +---+----------+-------------------+-------------------+
Upvotes: 1