Reputation: 1
All,
I am trying to create UDF for spark dataframe, which will be used for generating the unique ID per row. To ensure uniqueness, I am relying on: ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit
I have 2 questions :
error:Type mismatch;
found : String(SRC_")
required : org.apache.spark.sql.Column
df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
Please provide any pointer ...
Object SequenceGeneratorUtil extends Serializable {
val random = new scala.util.Random
val start = 10000
val end = 99999
//CustomEpochGenerator - this is custom function to generate the epoch value for current timestamp in milliseconds
// ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit
def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )
val GenID = udf[String, String](idGenerator __)
}
val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
Upvotes: 0
Views: 752
Reputation: 10372
Change below function
def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )
to below function, added mId
extra parameter in idGenerator
to hold monotonically_increasing_id
value.
def idGenerator(SrcIdentifier: String,mId: Long): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString + mId
Change below udf
val GenID = udf[String, String](idGenerator __)
to
val GenID = udf(idGenerator _)
failing for below error : error:Type mismatch; found : String(SRC_") required : org.apache.spark.sql.Column df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
Because SequenceGeneratorUtil.GenID
udf is expecting value of type org.apache.spark.sql.Column
but you are passing value SRC_
is of type String
.
To fix this issue, Use lit
function.
df.withColumn("rowkey",SequenceGeneratorUtil.GenID(lit("SRC_")) )
Change below withColumn
val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
to
val df2 = df
.withColumn(
"rowkey",
SequenceGeneratorUtil.GenID(
lit("SRC_"), // using lit function to pass static string.
monotonically_increasing_id
)
)
Upvotes: 1