Reputation: 1
I am looking at the window slide function for a Spark DataFrame in Spark SQL.
I have a dataframe with columns id
, month
and volume
.
id month volume new_col
1 201601 100 0
1 201602 120 100
1 201603 450 220
1 201604 200 670
1 201605 121 870
Now I want to add a new column with the name new_col
, the value of new_col
is the sum of volume
and new_col
before the current row, as shown above.
The value of the new_col
first line will be zero.
I tried below option to use the window function lag
by using PySpark. But I found that the new_col
column will be recursively used. The way by only using lag
function can not do this:
window = Window.partitionBy(F.col('id')).orderBy(F.col('month').asc())
df.withColumn('new_col', F.lag(col('volume'), 1).over(window) + F.lag(col('new_col'), 1).over(window))
Is there a way to dynamically lag the new_col
by using window functions? Or are there any other good solutions?
Upvotes: 0
Views: 4196
Reputation: 2200
You can use nested window functions
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>>
>>> data = sc.parallelize([
... (1,'201601',100),
... (1,'201602',120),
... (1,'201603',450),
... (1,'201604',200),
... (1,'201605',121)])
>>> col = ['id','month', 'volume']
>>>
>>> df = spark.createDataFrame(data, col)
>>> df.show()
+---+------+------+
| id| month|volume|
+---+------+------+
| 1|201601| 100|
| 1|201602| 120|
| 1|201603| 450|
| 1|201604| 200|
| 1|201605| 121|
+---+------+------+
>>> window1 = Window.partitionBy('id').orderBy('month')
>>> window2 = Window.partitionBy('id').orderBy('month').rangeBetween(Window.unboundedPreceding, 0)
>>> df = df.withColumn('new_col', F.sum(F.lag('volume').over(window1)).over(window2)).na.fill({'new_col': 0})
>>> df.show()
+---+------+------+-------+
| id| month|volume|new_col|
+---+------+------+-------+
| 1|201601| 100| 0|
| 1|201602| 120| 100|
| 1|201603| 450| 220|
| 1|201604| 200| 670|
| 1|201605| 121| 870|
+---+------+------+-------+
Upvotes: 1
Reputation: 28322
You can use lag
and sum
over a window to achieve this. sum
will automatically compute the cumsum if used over a window. The below code will first lag the volume
column and then take its cumsum but doing the operations in the opposite order is also possible.
window = Window.partitionBy(F.col('id')).orderBy(F.col('month').asc())
df.withColumn('new_col', F.sum(F.lag(col('volume'), 1, 0).over(window)).over(window))
Upvotes: 1