Reputation: 1635
I have a dataframe where I have to generate a unique Id in one of the columns. This id has to be generated with an offset. Because , I need to persist this dataframe with the autogenerated id , now if new data comes in the autogenerated id should not collide with the existing ones. I checked the monotonically increasing function but it does not accept any offset . This is what I tried :
df=df.coalesce(1);
df = df.withColumn(inputCol,functions.monotonically_increasing_id());
But is there a way to make the monotonically_increasing_id() start from a starting offset ?
Upvotes: 10
Views: 28768
Reputation: 11
You could add a rownumber to your columns and then add that to the maximum existing identity column, or your offset. Once it is set drop the rownumber attribute.
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
# Could also grab the exist max ID value
seed_value = 123
df = df.withColumn("row_number", sf.rowNumber().over(Window.partitionBy(sf.col("natural_key")).orderBy(sf.col("anything"))))
df = df.withColumn("id", sf.col("row_number")+seed_value)
Remember to drop the row_number attribute.
Upvotes: 1
Reputation: 7316
Or if you don't want to restrict your program into one only partition with df.coalesce(1)
you can use zipWithIndex
which starts with index = 0 as next:
lines = [["a1", "a2", "a3"],
["b1", "b2", "b3"],
["c1", "c2", "c3"]]
cols = ["c1", "c2", "c3"]
df = spark.createDataFrame(lines, cols)
start_indx = 10
df = df.rdd.zipWithIndex() \
.map(lambda (r, indx): (indx + start_indx, r[0], r[1], r[2])) \
.toDF(["id", "c1", "c2", "c3"])
df.show(10, False)
In this case I set the start_index = 10
. And this will be the output:
+---+---+---+---+
|id |c1 |c2 |c3 |
+---+---+---+---+
|10 |a1 |a2 |a3 |
|11 |b1 |b2 |b3 |
|12 |c1 |c2 |c3 |
+---+---+---+---+
Upvotes: 6
Reputation: 4540
You can simply add to it to provide a minimum value for the id. Note that it is not guaranteed the values will start from the minimum value
.withColumn("id", monotonically_increasing_id + 123)
Explanation: Operator +
is overloaded for columns https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L642
Upvotes: 9