Nikhil
Nikhil

Reputation: 57

Want to generate unique Ids as value changes from previous row using scala

I want to generate unique IDs as value changes from previous row in given column. I have dataframe in Spark Scala and want to add Unique_ID column to existing dataframe. I can not use Row Number over partitions or groupBy as same Product_IDs coming multiple times and want Unique_ID every time it is coming to column.

Product_IDs Unique_ID
Prod_1           1
Prod_1           1
Prod_1           1
Prod_2           2
Prod_3           3
Prod_3           3
Prod_2           4
Prod_3           5
Prod_1           6
Prod_1           6
Prod_4           7

I need this dataframe using Spark Scala.

Upvotes: 1

Views: 2956

Answers (2)

Tzach Zohar
Tzach Zohar

Reputation: 37822

Here's a solution which isn't necessarily the most efficient (I admit I couldn't find a way to optimize it), and a bit long, but works.

I'm assuming the input is composed of records represented by this case class:

case class Record(id: Int, productId: String)

Where the id defines the order.

We'll perform two calculations:

  1. For each record, find the minimum id of any subsequent record with a different productId
  2. Group by that value (which represents a group of consecutive records with same productId, and then zipWithIndex to create the unique ID we're interested in

I'm mixing RDD operations (for #2) and SQL (for #1) mostly for convenience, I'm assuming both operations can be done in any API (although I didn't try):

val input = sqlContext.createDataFrame(Seq(
  Record(1, "Prod_1"),
  Record(2, "Prod_1"),
  Record(3, "Prod_1"),
  Record(4, "Prod_2"),
  Record(5, "Prod_3"),
  Record(6, "Prod_3"),
  Record(7, "Prod_2"),
  Record(8, "Prod_3"),
  Record(9, "Prod_1"),
  Record(10, "Prod_1"),
  Record(11, "Prod_4")
))

input.registerTempTable("input")

// Step 1: find "nextShiftId" for each record
val withBlockId = sqlContext.sql(
  """
    |SELECT FIRST(a.id) AS id, FIRST(a.productId) AS productId, MIN(b.id) AS nextShiftId
    |FROM input a
    |LEFT JOIN input b ON a.productId != b.productId AND a.id < b.id
    |GROUP BY a.id
  """.stripMargin)

withBlockId.show()
// prints:
// +---+---------+-----------+
// | id|productId|nextShiftId|
// +---+---------+-----------+
// |  1|   Prod_1|          4|
// |  2|   Prod_1|          4|
// |  3|   Prod_1|          4|
// |  4|   Prod_2|          5|
// |  5|   Prod_3|          7|
// |  6|   Prod_3|          7|
// |  7|   Prod_2|          8|
// |  8|   Prod_3|          9|
// |  9|   Prod_1|         11|
// | 10|   Prod_1|         11|
// | 11|   Prod_4|       null|
// +---+---------+-----------+

// Step 2: group by "productId" and "nextShiftId"
val resultRdd = withBlockId.rdd
  .groupBy(r => (r.getAs[String]("productId"), r.getAs[Int]("nextShiftId")))
  // sort by nextShiftId to get the order right before adding index
  .sortBy {
     case ((prodId, 0), v) => Long.MaxValue // to handle the last batch where nextShiftId is null
     case ((prodId, nextShiftId), v) => nextShiftId
   }
  // zip with index (which would be the "unique id") and flatMap to just what we need:
  .values
  .zipWithIndex()
  .flatMap { case (records, index) => records.map(r => (r.getAs[String]("productId"), index+1))}

// transform back into DataFrame:
val result = sqlContext.createDataFrame(resultRdd)

result.show()
// prints:
// +------+---+
// |    _1| _2|
// +------+---+
// |Prod_1|  1|
// |Prod_1|  1|
// |Prod_1|  1|
// |Prod_2|  2|
// |Prod_3|  3|
// |Prod_3|  3|
// |Prod_2|  4|
// |Prod_3|  5|
// |Prod_1|  6|
// |Prod_1|  6|
// |Prod_4|  7|
// +------+---+

Upvotes: 0

radek1st
radek1st

Reputation: 1647

There are tho ways to add a column with unique ids that I can think of just now. One is to use zipWithUniqueId:

val rows = df.rdd.zipWithUniqueId().map { 
    case (r: Row, id: Long) => Row.fromSeq(r.toSeq :+ id)
} 

val newDf = sqlContext.createDataFrame(rows, StructType(df.schema.fields :+ StructField("uniqueIdColumn", LongType, false)))

another one is to use MonotonicallyIncreasingId function:

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

Upvotes: 1

Related Questions