Reputation: 57
I want to generate unique IDs as value changes from previous row in given column. I have dataframe in Spark Scala and want to add Unique_ID column to existing dataframe. I can not use Row Number over partitions or groupBy as same Product_IDs coming multiple times and want Unique_ID every time it is coming to column.
Product_IDs Unique_ID
Prod_1 1
Prod_1 1
Prod_1 1
Prod_2 2
Prod_3 3
Prod_3 3
Prod_2 4
Prod_3 5
Prod_1 6
Prod_1 6
Prod_4 7
I need this dataframe using Spark Scala.
Upvotes: 1
Views: 2956
Reputation: 37822
Here's a solution which isn't necessarily the most efficient (I admit I couldn't find a way to optimize it), and a bit long, but works.
I'm assuming the input is composed of records represented by this case class:
case class Record(id: Int, productId: String)
Where the id
defines the order.
We'll perform two calculations:
id
of any subsequent record with a different productId
productId
, and then zipWithIndex to create the unique ID we're interested inI'm mixing RDD operations (for #2) and SQL (for #1) mostly for convenience, I'm assuming both operations can be done in any API (although I didn't try):
val input = sqlContext.createDataFrame(Seq(
Record(1, "Prod_1"),
Record(2, "Prod_1"),
Record(3, "Prod_1"),
Record(4, "Prod_2"),
Record(5, "Prod_3"),
Record(6, "Prod_3"),
Record(7, "Prod_2"),
Record(8, "Prod_3"),
Record(9, "Prod_1"),
Record(10, "Prod_1"),
Record(11, "Prod_4")
))
input.registerTempTable("input")
// Step 1: find "nextShiftId" for each record
val withBlockId = sqlContext.sql(
"""
|SELECT FIRST(a.id) AS id, FIRST(a.productId) AS productId, MIN(b.id) AS nextShiftId
|FROM input a
|LEFT JOIN input b ON a.productId != b.productId AND a.id < b.id
|GROUP BY a.id
""".stripMargin)
withBlockId.show()
// prints:
// +---+---------+-----------+
// | id|productId|nextShiftId|
// +---+---------+-----------+
// | 1| Prod_1| 4|
// | 2| Prod_1| 4|
// | 3| Prod_1| 4|
// | 4| Prod_2| 5|
// | 5| Prod_3| 7|
// | 6| Prod_3| 7|
// | 7| Prod_2| 8|
// | 8| Prod_3| 9|
// | 9| Prod_1| 11|
// | 10| Prod_1| 11|
// | 11| Prod_4| null|
// +---+---------+-----------+
// Step 2: group by "productId" and "nextShiftId"
val resultRdd = withBlockId.rdd
.groupBy(r => (r.getAs[String]("productId"), r.getAs[Int]("nextShiftId")))
// sort by nextShiftId to get the order right before adding index
.sortBy {
case ((prodId, 0), v) => Long.MaxValue // to handle the last batch where nextShiftId is null
case ((prodId, nextShiftId), v) => nextShiftId
}
// zip with index (which would be the "unique id") and flatMap to just what we need:
.values
.zipWithIndex()
.flatMap { case (records, index) => records.map(r => (r.getAs[String]("productId"), index+1))}
// transform back into DataFrame:
val result = sqlContext.createDataFrame(resultRdd)
result.show()
// prints:
// +------+---+
// | _1| _2|
// +------+---+
// |Prod_1| 1|
// |Prod_1| 1|
// |Prod_1| 1|
// |Prod_2| 2|
// |Prod_3| 3|
// |Prod_3| 3|
// |Prod_2| 4|
// |Prod_3| 5|
// |Prod_1| 6|
// |Prod_1| 6|
// |Prod_4| 7|
// +------+---+
Upvotes: 0
Reputation: 1647
There are tho ways to add a column with unique ids that I can think of just now. One is to use zipWithUniqueId
:
val rows = df.rdd.zipWithUniqueId().map {
case (r: Row, id: Long) => Row.fromSeq(r.toSeq :+ id)
}
val newDf = sqlContext.createDataFrame(rows, StructType(df.schema.fields :+ StructField("uniqueIdColumn", LongType, false)))
another one is to use MonotonicallyIncreasingId
function:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)
Upvotes: 1