Reputation: 8844
I have a Dataframe with single column like shown below.
Type
'BAT'
'BAT'
'BALL'
'BAT'
'BALL'
'BALL'
To the above dataframe I have added a new column called 'const'.
df = df.withColumn('const',F.lit(1))
How do I perform a cumsum using window.partionBy() on 'const' column and create new row_id column?
Expected Output
Type row_id
'BAT' 1
'BAT' 2
'BALL' 3
'BAT' 4
'BALL' 5
'BALL' 6
I also dont want to use RDD, everything should be in Dataframe due to performance reasons.
EDIT
Upvotes: 0
Views: 5403
Reputation: 1
Using directly the row_number() function may change the original row order when you have defined your window to be ordered by a column with the same value in all rows. To avoid that, I would use first the monotically_increasing_id() to create a new column "row_order" which will keep the original row order (since it will give you a monotically increasing number). Then use the "row_number()" function and set the window to be ordered by the generated column "row_order", here is an example:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = df.withColumn('row_order',F.monotonically_increasing_id())
w = Window().partitionBy().orderBy("row_order")
df = df.withColumn('row_id',F.row_number().over(w))
df = df.drop("row_order")
This will ensure you keep the original row order in your table after applying the window.
Upvotes: 0
Reputation: 15258
if you just want a row index without taking into account the values, then use :
df = df.withColumn('row_id',F.monotonically_increasing_id())
this will create a unic index for each line.
If you want to take into account your values, and have the same index for a duplicate value, then use rank:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("type")
df = df.withColumn('row_id',F.rank().over(w))
You can of course achieve the same with sum or row_number, but the 2 methods above are better i think.
import sys
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy().rowsBetween(-sys.maxsize,0)
df = df.withColumn('row_id',F.sum("const").over(w))
or
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("const")
df = df.withColumn('row_id',F.row_number().over(w))
Upvotes: 2