Reputation: 2385
I have a PySpark DataFrame and I would like to get the second highest value of ORDERED_TIME
(DateTime Field yyyy-mm-dd
format) after a groupBy applied to 2 columns, namely CUSTOMER_ID
and ADDRESS_ID
.
A customer can have many orders associated with an address and I would like to get the second most recent order for a (customer,address)
pair
My approach was to make a window and partition according to CUSTOMER_ID
and ADDRESS_ID
, sort by ORDERED_TIME
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())
df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))
However, I get an error saying ValueError: 'over' is not in list
Could anyone suggest the right way to go about solving this problem?
Please let me know if any other information is needed
Sample Data
+-----------+----------+-------------------+
|USER_ID |ADDRESS_ID| ORDER DATE |
+-----------+----------+-------------------+
| 100| 1000 |2021-01-02 |
| 100| 1000 |2021-01-14 |
| 100| 1000 |2021-01-03 |
| 100| 1000 |2021-01-04 |
| 101| 2000 |2020-05-07 |
| 101| 2000 |2021-04-14 |
+-----------+----------+-------------------+
Expected Output
+-----------+----------+-------------------+-------------------+
|USER_ID |ADDRESS_ID| ORDER DATE |second_recent_order
+-----------+----------+-------------------+-------------------+
| 100| 1000 |2021-01-02 |2021-01-04
| 100| 1000 |2021-01-14 |2021-01-04
| 100| 1000 |2021-01-03 |2021-01-04
| 100| 1000 |2021-01-04 |2021-01-04
| 101| 2000 |2020-05-07 |2020-05-07
| 101| 2000 |2021-04-14 |2020-05-07
+-----------+----------+-------------------+-------------------
Upvotes: 5
Views: 6663
Reputation: 7207
Two windows can be used: sorted for get rows in proper order, and unsorted combined with "first" function, for get second row (Scala):
val df2 = Seq(
(100, 158932441, "2021-01-02 13:35:57"),
(100, 158932441, "2021-01-14 19:14:08"),
(100, 158932441, "2021-01-03 13:33:52"),
(100, 158932441, "2021-01-04 09:36:10"),
(101, 281838494, "2020-05-07 13:35:57"),
(101, 281838494, "2021-04-14 19:14:08")
).toDF("CUSTOMER_ID", "ADDRESS_ID", "ORDERED_TIME")
val sorted_order_times = Window
.partitionBy("CUSTOMER_ID", "ADDRESS_ID")
.orderBy(desc("ORDERED_TIME"))
val unsorted_order_times = Window
.partitionBy("CUSTOMER_ID", "ADDRESS_ID")
df2
.withColumn("row_number", row_number().over(sorted_order_times))
.withColumn("second_recent_order",
first(
when($"row_number" === lit(2), $"ORDERED_TIME").otherwise(null), true
).over(unsorted_order_times))
.drop("row_number")
Output:
+-----------+----------+-------------------+-------------------+
|CUSTOMER_ID|ADDRESS_ID|ORDERED_TIME |second_recent_order|
+-----------+----------+-------------------+-------------------+
|100 |158932441 |2021-01-14 19:14:08|2021-01-04 09:36:10|
|100 |158932441 |2021-01-04 09:36:10|2021-01-04 09:36:10|
|100 |158932441 |2021-01-03 13:33:52|2021-01-04 09:36:10|
|100 |158932441 |2021-01-02 13:35:57|2021-01-04 09:36:10|
|101 |281838494 |2021-04-14 19:14:08|2020-05-07 13:35:57|
|101 |281838494 |2020-05-07 13:35:57|2020-05-07 13:35:57|
+-----------+----------+-------------------+-------------------+
Upvotes: 0
Reputation: 726
You can use window here in the following way, but you will get null if only one row would be in a group
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df2 = df2.withColumn(
"second_recent_order",
collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
)
Upvotes: 2
Reputation: 106
Here is another way to do it. Using collect_list
import pyspark.sql.functions as F
from pyspark.sql import Window
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df2 = (
df
.withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()
Upvotes: 5
Reputation: 9247
One solution is to create a lookup table with the second most recent orders for all couples of CUSTOMER_ID
and ADDRESS_ID
, and then join it with the original dataframe.
I assume that your ORDERED_TIME
column is already a timestamp type.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# define window
w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))
# create lookup table
second_highest = df \
.withColumn('rank', F.dense_rank().over(w)) \
.filter(F.col('rank') == 2) \
.select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')
# join with original dataframe
df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')
df.show()
+-----------+----------+-------------------+-------------------+
|CUSTOMER_ID|ADDRESS_ID| ORDERED_TIME| ORDERED_TIME|
+-----------+----------+-------------------+-------------------+
| 100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
| 100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
| 100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
| 100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
| 101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
| 101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
+-----------+----------+-------------------+-------------------+
Note: in your expected output you wrote 2021-04-14 19:14:08
for CUSTOMER_ID == 101
, but it's actually 2020-05-07 13:35:57
because it's in year 2020.
Upvotes: 0