Reputation: 181
I want to create a column in pyspark that refers upon itself after the first row.
Customer | Week | Price | Index change | Column to be created
A 1 10 0.5 10
A 2 13 0.1 10* (1+0.1) = 11
A 3 16 0.6 11* (1+0.6) = 17.6
A 4 16 0.1 17.6 * (1+0.1) = 19.36
There are multiple customers in this dataset, each with 52 weeks. I know I have to use a window function, but I am having trouble applying that along with creating a function that essentially refers to itself after the first row, which refers to another column. I feel like it should be something like the below but not sure how to make it work and if you can refer to a column while it's being created?
df = df.withColumn('Column to be created',
F.when(F.col('week') != 1,
lag(df['Column to be created'])*(1+df['Index change']).over(win))
.otherwise(F.col('Price')))
* win refers to a partitionby that I have created already
Upvotes: 3
Views: 544
Reputation: 5880
As I understand, you are trying to compound price based on index changes. Also,think,We can't reuse new column before they are created. I tried my way, hope this helps.
dff= spark.createDataFrame([('A',1,10,0.5),('A',2,13,0.1),('A',3,16,0.6),('A',4,16,0.1)],['Customer', 'Week', 'Price', 'Index_change'])
dff.show()
+--------+----+-----+------------+
|Customer|Week|Price|Index_change|
+--------+----+-----+------------+
| A| 1| 10| 0.5|
| A| 2| 13| 0.1|
| A| 3| 16| 0.6|
| A| 4| 16| 0.1|
+--------+----+-----+------------+
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy('Customer').orderBy('week').rowsBetween(Window.unboundedPreceding,0)
#2nd row : 10*(1+0.1),3rd row: 10*(1+0.1)*(1+0.6),4th row: 10*(1+0.1)*(1+0.6)*(1+0.1)..so on
#say, for 3rd row, you need cumulative product of (index_change + 1). In algebra, log(a*b) = log(a)+log(b),using this,log_sum:log(1+0.1)+log(1+0.6)
#cum_idx : to convert from log space back to original space,we use exp(log value).
log_sum = F.sum(F.when(F.col('Week')!=1,F.log(F.col("index_change") + 1))).over(w) # sum of logs = multiplying them
cum_idx = F.exp(log_sum) # back to original
base_value = F.first('Price').over(w) # getting base value
dff = dff.withColumn('new_column',F.when(F.col('Week') != 1,cum_idx*base_value).otherwise(F.col('Price')))
+--------+----+-----+------------+------------------+
|Customer|Week|Price|Index_change| new_column|
+--------+----+-----+------------+------------------+
| A| 1| 10| 0.5| 10.0|
| A| 2| 13| 0.1| 11.0|
| A| 3| 16| 0.6| 17.6|
| A| 4| 16| 0.1| 19.36|
+--------+----+-----+------------+------------------+
Upvotes: 4
Reputation: 3224
You can use a udf and two window functions
data = sc.parallelize([
('A', 1, 10, 0.5),
('A', 2, 13, 0.1),
('A', 3, 16, 0.6),
('A', 4, 16, 0.1),
('B', 1, 10, 0.5),
('B', 2, 13, 0.1),
('B', 3, 16, 0.6),
('B', 4, 16, 0.1),
])
df = spark.createDataFrame(data, ['Customer', 'Week', 'Price', 'Index change'])
window1 = Window.partitionBy('Customer').orderBy('week')
window2 = Window.partitionBy('Customer').orderBy('week').rangeBetween(Window.unboundedPreceding, 0)
from functools import reduce
@F.udf(FloatType())
def mul_list(l):
if len(l) == 1:
return None
else:
return reduce(lambda x,y: x*y, l[1:])
df.withColumn('new_col', F.collect_list(F.col('Index change') + 1).over(window2))\
.withColumn('mult', mul_list('new_col'))\
.withColumn('result', F.first(F.col('Price')).over(window1) * F.coalesce(F.col('mult'), F.lit(1))).show()
which results in
+--------+----+-----+------------+--------------------+-----+------+
|Customer|Week|Price|Index change| new_col| mult|result|
+--------+----+-----+------------+--------------------+-----+------+
| B| 1| 10| 0.5| [1.5]| null| 10.0|
| B| 2| 13| 0.1| [1.5, 1.1]| 1.1| 11.0|
| B| 3| 16| 0.6| [1.5, 1.1, 1.6]| 1.76| 17.6|
| B| 4| 16| 0.1|[1.5, 1.1, 1.6, 1.1]|1.936| 19.36|
| A| 1| 10| 0.5| [1.5]| null| 10.0|
| A| 2| 13| 0.1| [1.5, 1.1]| 1.1| 11.0|
| A| 3| 16| 0.6| [1.5, 1.1, 1.6]| 1.76| 17.6|
| A| 4| 16| 0.1|[1.5, 1.1, 1.6, 1.1]|1.936| 19.36|
+--------+----+-----+------------+--------------------+-----+------+
I created new columns to make the in-between steps more explicit.
Upvotes: 1