Reputation: 33036
We have a times series database for user events, which looks like the following:
timestamp user_id event ticke_type error_type
2019-06-06 14:33:31 user_a choose_ticket ticke_b NULL
2019-06-06 14:34:31 user_b choose_ticket ticke_f NULL
2019-06-06 14:36:31 user_a booing_error NULL error_c
2019-06-06 14:37:31 user_a choose_ticket ticke_h NULL
2019-06-06 14:38:31 user_a booing_error NULL error_d
2019-06-06 14:39:31 user_a booing_error NULL error_e
This is one usecase that we need:
In order to investigate what kind of ticket type is causing some booking error,
we will have to look into the ticket type,
which is available only on the earlier event choose_ticket
.
In this case, what we are looking is for each booking_error
event, find the previous choose_ticket
event
for the same user and merge the ticket type there to the booking_error
event.
So ideally, the output that we want is:
timestamp user_id event ticke_type error_type
2019-06-06 14:36:31 user_a booing_error ticke_b error_c
2019-06-06 14:38:31 user_a booing_error ticke_h error_d
2019-06-06 14:39:31 user_a booing_error ticke_h error_e
The closest that I can find is Spark add new column to dataframe with value from previous row, which allows us to take the properties from a previous event and apply it to the event right after.
This works almost, except that when there are multiple events (booing_error
in this example), only the very first one can get the needed properties in this case.
e.g., this what we will get with the solution from the SO link above:
timestamp user_id event ticke_type error_type
2019-06-06 14:36:31 user_a booing_error ticke_b error_c
2019-06-06 14:38:31 user_a booing_error ticke_h error_d
2019-06-06 14:39:31 user_a booing_error NULL error_e
To sum up, for a given row, how to find the previous row matching certain criteria and "cherry-pick" its property over?
What's the best way to do this?
Upvotes: 3
Views: 8380
Reputation: 2685
Here is pyspark version
df = self.spark.createDataFrame(
[('2019-06-06 14:33:31', 'user_a', 'choose_ticket', 'ticke_b', None),
('2019-06-06 14:34:31', 'user_b', 'choose_ticket', 'ticke_f', None),
('2019-06-06 14:36:31', 'user_a', 'booing_error', None, 'error_c'),
('2019-06-06 14:37:31', 'user_a', 'choose_ticket', 'ticke_h', None),
('2019-06-06 14:38:31', 'user_a', 'booing_error', None, 'error_d'),
('2019-06-06 14:39:31', 'user_a', 'booing_error', None, 'error_e'),
],
("timestamp", "user_id", "event", "ticke_type", "error_type"))
df.show()
window_spec = Window.partitionBy(col("user_id")).orderBy(col("timestamp"))
df = df.withColumn('ticke_type_forwardfill', when(col("event") == "choose_ticket", col("ticke_type")) \
.otherwise(last("ticke_type", True).over(window_spec))) \
.drop(col("ticke_type")) \
.filter(col("event") == "booing_error")
df.show()
result
+-------------------+-------+-------------+----------+----------+
| timestamp|user_id| event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket| ticke_b| null|
|2019-06-06 14:34:31| user_b|choose_ticket| ticke_f| null|
|2019-06-06 14:36:31| user_a| booing_error| null| error_c|
|2019-06-06 14:37:31| user_a|choose_ticket| ticke_h| null|
|2019-06-06 14:38:31| user_a| booing_error| null| error_d|
|2019-06-06 14:39:31| user_a| booing_error| null| error_e|
+-------------------+-------+-------------+----------+----------+
+-------------------+-------+------------+----------+----------------------+
| timestamp|user_id| event|error_type|ticke_type_forwardfill|
+-------------------+-------+------------+----------+----------------------+
|2019-06-06 14:36:31| user_a|booing_error| error_c| ticke_b|
|2019-06-06 14:38:31| user_a|booing_error| error_d| ticke_h|
|2019-06-06 14:39:31| user_a|booing_error| error_e| ticke_h|
+-------------------+-------+------------+----------+----------------------+
Upvotes: 1
Reputation: 1758
org.apache.spark.sql.functions.last
is what you are looking for. You may rename the "closest" column to replace ticke_type in the end.
scala> df.show
+-------------------+-------+-------------+----------+----------+
| timestamp|user_id| event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket| ticke_b| null|
|2019-06-06 14:34:31| user_b|choose_ticket| ticke_f| null|
|2019-06-06 14:36:31| user_a|booking_error| null| error_c|
|2019-06-06 14:37:31| user_a|choose_ticket| ticke_h| null|
|2019-06-06 14:38:31| user_a|booking_error| null| error_d|
|2019-06-06 14:39:31| user_a|booking_error| null| error_e|
+-------------------+-------+-------------+----------+----------+
scala> val overColumns = Window.partitionBy("user_id").orderBy("timestamp")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70dc8c9a
scala> df.withColumn("closest",
org.apache.spark.sql.functions.last("ticke_type", true).over(overColumns)).filter($"event" === "booking_error").show
+-------------------+-------+-------------+----------+----------+-------+
| timestamp|user_id| event|ticke_type|error_type|closest|
+-------------------+-------+-------------+----------+----------+-------+
|2019-06-06 14:36:31| user_a|booking_error| null| error_c|ticke_b|
|2019-06-06 14:38:31| user_a|booking_error| null| error_d|ticke_h|
|2019-06-06 14:39:31| user_a|booking_error| null| error_e|ticke_h|
+-------------------+-------+-------------+----------+----------+-------+
Upvotes: 6