GeorgeOfTheRF
GeorgeOfTheRF

Reputation: 8844

How to create row_index for a Spark dataframe using window.partionBy()?

I have a Dataframe with single column like shown below.

Type
'BAT'
'BAT'
'BALL'
'BAT'
'BALL'
'BALL'

To the above dataframe I have added a new column called 'const'.

df = df.withColumn('const',F.lit(1))

How do I perform a cumsum using window.partionBy() on 'const' column and create new row_id column?

Expected Output

Type  row_id
'BAT'   1
'BAT'   2
'BALL'  3
'BAT'   4
'BALL'  5
'BALL'  6

I also dont want to use RDD, everything should be in Dataframe due to performance reasons.

EDIT

Upvotes: 0

Views: 5403

Answers (2)

SarAI
SarAI

Reputation: 1

Using directly the row_number() function may change the original row order when you have defined your window to be ordered by a column with the same value in all rows. To avoid that, I would use first the monotically_increasing_id() to create a new column "row_order" which will keep the original row order (since it will give you a monotically increasing number). Then use the "row_number()" function and set the window to be ordered by the generated column "row_order", here is an example:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = df.withColumn('row_order',F.monotonically_increasing_id())
w = Window().partitionBy().orderBy("row_order")
df = df.withColumn('row_id',F.row_number().over(w))
df = df.drop("row_order")

This will ensure you keep the original row order in your table after applying the window.

Upvotes: 0

Steven
Steven

Reputation: 15258

if you just want a row index without taking into account the values, then use :

df = df.withColumn('row_id',F.monotonically_increasing_id())

this will create a unic index for each line.

If you want to take into account your values, and have the same index for a duplicate value, then use rank:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("type")
df = df.withColumn('row_id',F.rank().over(w))

You can of course achieve the same with sum or row_number, but the 2 methods above are better i think.

import sys
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy().rowsBetween(-sys.maxsize,0)
df = df.withColumn('row_id',F.sum("const").over(w))

or

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("const")
df = df.withColumn('row_id',F.row_number().over(w))

Upvotes: 2

Related Questions