Ayan Biswas
Ayan Biswas

Reputation: 1635

How to create an unique autogenerated Id column in a spark dataframe

I have a dataframe where I have to generate a unique Id in one of the columns. This id has to be generated with an offset. Because , I need to persist this dataframe with the autogenerated id , now if new data comes in the autogenerated id should not collide with the existing ones. I checked the monotonically increasing function but it does not accept any offset . This is what I tried :

df=df.coalesce(1);
df = df.withColumn(inputCol,functions.monotonically_increasing_id());

But is there a way to make the monotonically_increasing_id() start from a starting offset ?

Upvotes: 10

Views: 28768

Answers (3)

Gregory Deardurff
Gregory Deardurff

Reputation: 11

You could add a rownumber to your columns and then add that to the maximum existing identity column, or your offset. Once it is set drop the rownumber attribute.

from pyspark.sql import functions as sf
from pyspark.sql.window import Window

# Could also grab the exist max ID value
seed_value = 123

df = df.withColumn("row_number", sf.rowNumber().over(Window.partitionBy(sf.col("natural_key")).orderBy(sf.col("anything"))))

df = df.withColumn("id", sf.col("row_number")+seed_value)

Remember to drop the row_number attribute.

Upvotes: 1

abiratsis
abiratsis

Reputation: 7316

Or if you don't want to restrict your program into one only partition with df.coalesce(1) you can use zipWithIndex which starts with index = 0 as next:

lines = [["a1", "a2", "a3"],
            ["b1", "b2", "b3"],
            ["c1", "c2", "c3"]]

    cols = ["c1", "c2", "c3"]

    df = spark.createDataFrame(lines, cols)

    start_indx = 10
    df = df.rdd.zipWithIndex() \
           .map(lambda (r, indx): (indx + start_indx, r[0], r[1], r[2])) \
           .toDF(["id", "c1", "c2", "c3"])

    df.show(10, False)

In this case I set the start_index = 10. And this will be the output:

+---+---+---+---+
|id |c1 |c2 |c3 |
+---+---+---+---+
|10 |a1 |a2 |a3 |
|11 |b1 |b2 |b3 |
|12 |c1 |c2 |c3 |
+---+---+---+---+

Upvotes: 6

ollik1
ollik1

Reputation: 4540

You can simply add to it to provide a minimum value for the id. Note that it is not guaranteed the values will start from the minimum value

.withColumn("id", monotonically_increasing_id + 123)

Explanation: Operator + is overloaded for columns https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L642

Upvotes: 9

Related Questions