SHM
SHM

Reputation: 303

How to add column with sequence value in Spark dataframe?

How can I add column with sequence value from a specific number in PySpark data frame?

Current Dataset:

Col1    Col2    Flag
Val1    Val2    F
Val3    Val4    T

But I want the data set to be like this:

Col1    Col2    Flag    New_Col
Val1    Val2    F       11F
Val3    Val4    T       12T

I'm using the below code, in Python.

from pyspark.sql import functions as F
from pyspark.sql import types as T

seq = 10

def fn_increment_id(flag):
    global seq
    seq += 1
    return str(seq) + flag

if __name__ == "__main__":
    df = spark.loadFromMapRDB("path/to/table")
    my_udf = F.UserDefinedFunction(fn_increment_id, T.StringType())
    df = df.withColumn("New_Col", my_udf("Flag"))
    print(df.show(10))

But, I ends up with the result:

Received Dataset:

Col1    Col2    Flag    New_Col
Val1    Val2    F       11F
Val3    Val4    T       11T

So, it incremented by once for all rows. How can I increment for each row? Thanks in advance.

Upvotes: 3

Views: 11496

Answers (1)

Shaido
Shaido

Reputation: 28392

A column with sequential values can be added by using a Window. This is fine as long as the dataframe is not too big, for larger dataframes you should consider using partitionBy on the window, but the values will not be sequential then.

The below code creates the sequential numbers for each row, adds 10 to it and then concatinate the value with the Flag column to create a new column. Here the rows are sorted by Col1 but any column can be used.

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, concat

w = Window().orderBy("Col1")
df = df.withColumn("New_Col", concat(row_number().over(w) + 10, col(Flag)))

Upvotes: 4

Related Questions