Smita Singh
Smita Singh

Reputation: 1

Spark UDF for sequence Generator

All,

I am trying to create UDF for spark dataframe, which will be used for generating the unique ID per row. To ensure uniqueness, I am relying on: ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit

I have 2 questions :

  1. how to include monotonically_increasing_id() during id generation function "idGenerator"
  2. while using the UDF, failing for below error :
    error:Type mismatch;
    found : String(SRC_")
    required : org.apache.spark.sql.Column
            df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") ) 

Please provide any pointer ...

Object SequenceGeneratorUtil extends Serializable {

    val random = new scala.util.Random
    val start = 10000
    val end = 99999

    //CustomEpochGenerator - this is custom function to generate the epoch value for current timestamp in milliseconds
    // ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit
    def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )

    val GenID = udf[String, String](idGenerator __)

}

val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") ) 

Upvotes: 0

Views: 752

Answers (1)

s.polam
s.polam

Reputation: 10372

Change below function

def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )

to below function, added mId extra parameter in idGenerator to hold monotonically_increasing_id value.

def idGenerator(SrcIdentifier: String,mId: Long): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString + mId

Change below udf

val GenID = udf[String, String](idGenerator __)

to

val GenID = udf(idGenerator _)

failing for below error : error:Type mismatch; found : String(SRC_") required : org.apache.spark.sql.Column df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )

Because SequenceGeneratorUtil.GenID udf is expecting value of type org.apache.spark.sql.Column but you are passing value SRC_ is of type String.

To fix this issue, Use lit function.

df.withColumn("rowkey",SequenceGeneratorUtil.GenID(lit("SRC_")) )

Change below withColumn

val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") ) 

to

val df2 = df
            .withColumn(
                "rowkey",
                SequenceGeneratorUtil.GenID(
                    lit("SRC_"), // using lit function to pass static string.
                    monotonically_increasing_id
                )
            ) 

Upvotes: 1

Related Questions