Reputation: 719
My input spark dataframe is;
Date Client Values Ratios
2020-10-26 1 NULL 0.21
2020-10-27 1 NULL 0.1
2020-10-28 1 NULL 0.25
2020-10-29 1 6 0.3
2020-10-30 1 NULL 0.4
2020-10-31 1 NULL 0.5
2020-11-01 1 1 0.3
2020-11-02 1 NULL 0.13
2020-11-03 1 NULL 0.67
2020-11-04 1 NULL 0.54
2020-11-05 1 NULL 0.2
2020-11-06 1 NULL 0.21
2020-11-07 1 2 0.7
2020-11-08 1 9 0.75
2020-11-09 1 NULL 0.9
2020-10-26 2 NULL 0.71
2020-10-27 2 NULL 0.19
2020-10-28 2 NULL 0.3
2020-10-29 2 10 0.34
2020-10-30 2 6 0.35
2020-10-31 2 NULL 0.93
2020-11-01 2 NULL 0.45
2020-11-02 2 NULL 0.43
2020-11-03 2 NULL 0.09
2020-11-04 2 NULL 0.39
2020-11-05 2 3 0.41
I want to create a "Ratios_latest" column. For that, I should go down by the value of "Value" column for each client. Anda that will be my "Ratio_latest" column value. I shared the desired output according to the data above;
Date Client Values Ratios Ratios_latest
2020-10-26 1 NULL 0.21 NULL
2020-10-27 1 NULL 0.1 NULL
2020-10-28 1 NULL 0.25 NULL
2020-10-29 1 6 0.3 0.54 -> After 6 rows later "Ratios" Column value is 0.54
2020-10-30 1 NULL 0.4 NULL
2020-10-31 1 NULL 0.5 NULL
2020-11-01 1 1 0.3 0.13 -> After 1 rows later "Ratios" Column value is 0.13
2020-11-02 1 NULL 0.13 NULL
2020-11-03 1 NULL 0.67 NULL
2020-11-04 1 NULL 0.54 NULL
2020-11-05 1 NULL 0.2 NULL
2020-11-06 1 NULL 0.21 NULL
2020-11-07 1 2 0.7 0.9 -> After 2 rows later "Ratios" Column value is 0.9
2020-11-08 1 9 0.75 NULL -> This value is null because it is for each client.
2020-11-09 1 NULL 0.9 NULL
2020-10-26 2 NULL 0.71 NULL
2020-10-27 2 NULL 0.19 NULL
2020-10-28 2 NULL 0.3 NULL
2020-10-29 2 10 0.34 0.98 -> After 10 rows later "Ratios" Column value is 0.98
2020-10-30 2 6 0.35 0.41 -> After 6 rows later "Ratios" Column value is 0.41
2020-10-31 2 NULL 0.93 NULL
2020-11-01 2 NULL 0.45 NULL
2020-11-02 2 NULL 0.43 NULL
2020-11-03 2 NULL 0.09 NULL
2020-11-04 2 NULL 0.39 NULL
2020-11-05 2 3 0.41 NULL
Could you please help me about this?
Upvotes: 0
Views: 60
Reputation: 42352
A tricky case with variable offset, which is not supported by lead
, but can be solved using collect_list
:
import pyspark.sql.functions as F
df2 = df.withColumn(
'Ratios_latest',
F.expr('collect_list(Ratios) over (partition by Client order by Date rows between current row and unbounded following)')
).withColumn(
'Ratios_latest',
F.expr('Ratios_latest[Values]')
)
df2.show(99)
+----------+------+------+------+-------------+
| Date|Client|Values|Ratios|Ratios_latest|
+----------+------+------+------+-------------+
|2020-10-26| 1| null| 0.21| null|
|2020-10-27| 1| null| 0.1| null|
|2020-10-28| 1| null| 0.25| null|
|2020-10-29| 1| 6| 0.3| 0.54|
|2020-10-30| 1| null| 0.4| null|
|2020-10-31| 1| null| 0.5| null|
|2020-11-01| 1| 1| 0.3| 0.13|
|2020-11-02| 1| null| 0.13| null|
|2020-11-03| 1| null| 0.67| null|
|2020-11-04| 1| null| 0.54| null|
|2020-11-05| 1| null| 0.2| null|
|2020-11-06| 1| null| 0.21| null|
|2020-11-07| 1| 2| 0.7| 0.9|
|2020-11-08| 1| 9| 0.75| null|
|2020-11-09| 1| null| 0.9| null|
|2020-10-26| 2| null| 0.71| null|
|2020-10-27| 2| null| 0.19| null|
|2020-10-28| 2| null| 0.3| null|
|2020-10-29| 2| 10| 0.34| null|
|2020-10-30| 2| 6| 0.35| 0.41|
|2020-10-31| 2| null| 0.93| null|
|2020-11-01| 2| null| 0.45| null|
|2020-11-02| 2| null| 0.43| null|
|2020-11-03| 2| null| 0.09| null|
|2020-11-04| 2| null| 0.39| null|
|2020-11-05| 2| 3| 0.41| null|
+----------+------+------+------+-------------+
Upvotes: 2