Yuchen
Yuchen

Reputation: 33036

Spark Window function last not null value

We have a times series database for user events, which looks like the following:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:33:31   user_a      choose_ticket    ticke_b        NULL
2019-06-06 14:34:31   user_b      choose_ticket    ticke_f        NULL
2019-06-06 14:36:31   user_a      booing_error     NULL           error_c  
2019-06-06 14:37:31   user_a      choose_ticket    ticke_h        NULL
2019-06-06 14:38:31   user_a      booing_error     NULL           error_d
2019-06-06 14:39:31   user_a      booing_error     NULL           error_e

This is one usecase that we need:

In order to investigate what kind of ticket type is causing some booking error, we will have to look into the ticket type, which is available only on the earlier event choose_ticket.

In this case, what we are looking is for each booking_error event, find the previous choose_ticket event for the same user and merge the ticket type there to the booking_error event.

So ideally, the output that we want is:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31   user_a      booing_error     ticke_b        error_c  
2019-06-06 14:38:31   user_a      booing_error     ticke_h        error_d
2019-06-06 14:39:31   user_a      booing_error     ticke_h        error_e

The closest that I can find is Spark add new column to dataframe with value from previous row, which allows us to take the properties from a previous event and apply it to the event right after.

This works almost, except that when there are multiple events (booing_error in this example), only the very first one can get the needed properties in this case. e.g., this what we will get with the solution from the SO link above:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31   user_a      booing_error     ticke_b        error_c  
2019-06-06 14:38:31   user_a      booing_error     ticke_h        error_d
2019-06-06 14:39:31   user_a      booing_error     NULL           error_e

To sum up, for a given row, how to find the previous row matching certain criteria and "cherry-pick" its property over?

What's the best way to do this?

Upvotes: 3

Views: 8380

Answers (2)

howie
howie

Reputation: 2685

Here is pyspark version

 df = self.spark.createDataFrame(
            [('2019-06-06 14:33:31', 'user_a', 'choose_ticket', 'ticke_b', None),
             ('2019-06-06 14:34:31', 'user_b', 'choose_ticket', 'ticke_f', None),
             ('2019-06-06 14:36:31', 'user_a', 'booing_error', None, 'error_c'),
             ('2019-06-06 14:37:31', 'user_a', 'choose_ticket', 'ticke_h', None),
             ('2019-06-06 14:38:31', 'user_a', 'booing_error', None, 'error_d'),
             ('2019-06-06 14:39:31', 'user_a', 'booing_error', None, 'error_e'),
             ],
            ("timestamp", "user_id", "event", "ticke_type", "error_type"))

        df.show()

        window_spec = Window.partitionBy(col("user_id")).orderBy(col("timestamp"))

        df = df.withColumn('ticke_type_forwardfill', when(col("event") == "choose_ticket", col("ticke_type")) \
                           .otherwise(last("ticke_type", True).over(window_spec))) \
            .drop(col("ticke_type")) \
            .filter(col("event") == "booing_error")

        df.show()

result

+-------------------+-------+-------------+----------+----------+
|          timestamp|user_id|        event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket|   ticke_b|      null|
|2019-06-06 14:34:31| user_b|choose_ticket|   ticke_f|      null|
|2019-06-06 14:36:31| user_a| booing_error|      null|   error_c|
|2019-06-06 14:37:31| user_a|choose_ticket|   ticke_h|      null|
|2019-06-06 14:38:31| user_a| booing_error|      null|   error_d|
|2019-06-06 14:39:31| user_a| booing_error|      null|   error_e|
+-------------------+-------+-------------+----------+----------+

+-------------------+-------+------------+----------+----------------------+
|          timestamp|user_id|       event|error_type|ticke_type_forwardfill|
+-------------------+-------+------------+----------+----------------------+
|2019-06-06 14:36:31| user_a|booing_error|   error_c|               ticke_b|
|2019-06-06 14:38:31| user_a|booing_error|   error_d|               ticke_h|
|2019-06-06 14:39:31| user_a|booing_error|   error_e|               ticke_h|
+-------------------+-------+------------+----------+----------------------+



Upvotes: 1

C.S.Reddy Gadipally
C.S.Reddy Gadipally

Reputation: 1758

org.apache.spark.sql.functions.last is what you are looking for. You may rename the "closest" column to replace ticke_type in the end.

scala> df.show
+-------------------+-------+-------------+----------+----------+
|          timestamp|user_id|        event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket|   ticke_b|      null|
|2019-06-06 14:34:31| user_b|choose_ticket|   ticke_f|      null|
|2019-06-06 14:36:31| user_a|booking_error|      null|   error_c|
|2019-06-06 14:37:31| user_a|choose_ticket|   ticke_h|      null|
|2019-06-06 14:38:31| user_a|booking_error|      null|   error_d|
|2019-06-06 14:39:31| user_a|booking_error|      null|   error_e|
+-------------------+-------+-------------+----------+----------+

scala> val overColumns = Window.partitionBy("user_id").orderBy("timestamp")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70dc8c9a

scala> df.withColumn("closest", 
  org.apache.spark.sql.functions.last("ticke_type", true).over(overColumns)).filter($"event" === "booking_error").show
+-------------------+-------+-------------+----------+----------+-------+
|          timestamp|user_id|        event|ticke_type|error_type|closest|
+-------------------+-------+-------------+----------+----------+-------+
|2019-06-06 14:36:31| user_a|booking_error|      null|   error_c|ticke_b|
|2019-06-06 14:38:31| user_a|booking_error|      null|   error_d|ticke_h|
|2019-06-06 14:39:31| user_a|booking_error|      null|   error_e|ticke_h|
+-------------------+-------+-------------+----------+----------+-------+

Upvotes: 6

Related Questions