Jitesh Malipeddi
Jitesh Malipeddi

Reputation: 2385

How to get second highest value from a column pyspark?

I have a PySpark DataFrame and I would like to get the second highest value of ORDERED_TIME (DateTime Field yyyy-mm-dd format) after a groupBy applied to 2 columns, namely CUSTOMER_ID and ADDRESS_ID.

A customer can have many orders associated with an address and I would like to get the second most recent order for a (customer,address) pair

My approach was to make a window and partition according to CUSTOMER_ID and ADDRESS_ID, sort by ORDERED_TIME

sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())

df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))

However, I get an error saying ValueError: 'over' is not in list

Could anyone suggest the right way to go about solving this problem?

Please let me know if any other information is needed

Sample Data

+-----------+----------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  | 
+-----------+----------+-------------------+
|        100| 1000     |2021-01-02         |
|        100| 1000     |2021-01-14         |
|        100| 1000     |2021-01-03         |
|        100| 1000     |2021-01-04         |
|        101| 2000     |2020-05-07         |
|        101| 2000     |2021-04-14         |
+-----------+----------+-------------------+

Expected Output

+-----------+----------+-------------------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  |second_recent_order
+-----------+----------+-------------------+-------------------+
|        100| 1000     |2021-01-02          |2021-01-04 
|        100| 1000     |2021-01-14          |2021-01-04 
|        100| 1000     |2021-01-03          |2021-01-04 
|        100| 1000     |2021-01-04          |2021-01-04 
|        101| 2000     |2020-05-07          |2020-05-07 
|        101| 2000     |2021-04-14          |2020-05-07 
+-----------+----------+-------------------+-------------------

Upvotes: 5

Views: 6663

Answers (4)

pasha701
pasha701

Reputation: 7207

Two windows can be used: sorted for get rows in proper order, and unsorted combined with "first" function, for get second row (Scala):

val df2 = Seq(
  (100, 158932441, "2021-01-02 13:35:57"),
  (100, 158932441, "2021-01-14 19:14:08"),
  (100, 158932441, "2021-01-03 13:33:52"),
  (100, 158932441, "2021-01-04 09:36:10"),
  (101, 281838494, "2020-05-07 13:35:57"),
  (101, 281838494, "2021-04-14 19:14:08")
).toDF("CUSTOMER_ID", "ADDRESS_ID", "ORDERED_TIME")

val sorted_order_times = Window
  .partitionBy("CUSTOMER_ID", "ADDRESS_ID")
  .orderBy(desc("ORDERED_TIME"))

val unsorted_order_times = Window
  .partitionBy("CUSTOMER_ID", "ADDRESS_ID")

df2
  .withColumn("row_number", row_number().over(sorted_order_times))
  .withColumn("second_recent_order",
  first(
    when($"row_number" === lit(2), $"ORDERED_TIME").otherwise(null), true
  ).over(unsorted_order_times))
  .drop("row_number")

Output:

+-----------+----------+-------------------+-------------------+
|CUSTOMER_ID|ADDRESS_ID|ORDERED_TIME       |second_recent_order|
+-----------+----------+-------------------+-------------------+
|100        |158932441 |2021-01-14 19:14:08|2021-01-04 09:36:10|
|100        |158932441 |2021-01-04 09:36:10|2021-01-04 09:36:10|
|100        |158932441 |2021-01-03 13:33:52|2021-01-04 09:36:10|
|100        |158932441 |2021-01-02 13:35:57|2021-01-04 09:36:10|
|101        |281838494 |2021-04-14 19:14:08|2020-05-07 13:35:57|
|101        |281838494 |2020-05-07 13:35:57|2020-05-07 13:35:57|
+-----------+----------+-------------------+-------------------+

Upvotes: 0

Artem Astashov
Artem Astashov

Reputation: 726

You can use window here in the following way, but you will get null if only one row would be in a group


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df2 = df2.withColumn(
    "second_recent_order",
    collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
)

Upvotes: 2

pradeep
pradeep

Reputation: 106

Here is another way to do it. Using collect_list

import pyspark.sql.functions as F
from pyspark.sql import Window


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding,  Window.unboundedFollowing)
df2 = (
  df
  .withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()

Final output

Upvotes: 5

Ric S
Ric S

Reputation: 9247

One solution is to create a lookup table with the second most recent orders for all couples of CUSTOMER_ID and ADDRESS_ID, and then join it with the original dataframe.
I assume that your ORDERED_TIME column is already a timestamp type.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# define window
w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))

# create lookup table
second_highest = df \
  .withColumn('rank', F.dense_rank().over(w)) \
  .filter(F.col('rank') == 2) \
  .select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')

# join with original dataframe
df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')

df.show()

+-----------+----------+-------------------+-------------------+
|CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME|       ORDERED_TIME|
+-----------+----------+-------------------+-------------------+
|        100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
|        100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
|        100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
|        100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
|        101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
|        101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
+-----------+----------+-------------------+-------------------+

Note: in your expected output you wrote 2021-04-14 19:14:08 for CUSTOMER_ID == 101, but it's actually 2020-05-07 13:35:57 because it's in year 2020.

Upvotes: 0

Related Questions