Rajesh Giriyappa
Rajesh Giriyappa

Reputation: 41

Spark ETL Unique Identifier for Entities Generated

We have a requirement in Spark where the every record coming from the feed is broken into set of entites. Example {col1,col2,col3}=>Resource, {Col4,col5,col6}=> Account,{col7,col8}=>EntityX etc.

Now I need a unique identifier generated in the ETL Layer which can be persisted to the database table respectively for each of the above mentioned tables/entities.

This Unique Identifier acts a lookup value to identify the each table records and generate sequence in the DB.

  1. First Approach was using the Redis keys to generate the keys for every entities identified using the Natural Unique columns in the feed. But this approach was not stable as the redis used crash in the peak hours and redis operates in the single threaded mode.It woulbe slow when im running too many etl jobs parallely.
  2. My Thought is to used a Crypto Alghorithm like SHA256 rather than Sha32 Algorithm has 32 bit there is possibility of hash collision for different values.were as SHA256 has more bits so the range of hash values = 2^64 so the Possibility of the HashCollision is very less since the SHA256 uses Block Cipher of 4bit to encryption.

But the Second option is not well accepted by many people. What are the other options/solutions to Create a Unique Keys in the ETL layer which can looked back in the DB for comparison.

Thanks in Advance, Rajesh Giriayppa

Upvotes: 1

Views: 1472

Answers (3)

Brad LaVigne
Brad LaVigne

Reputation: 66

This is a little late, but in case someone else is looking...

I ran into a similar requirement. As Oli mentioned previously, zipWithIndex will give sequential, zero-indexed id's, which you can then map onto an offset. Note, there is a critical section, so a locking mechanism could be required, depending on use case.

case class Resource(_1: String, _2: String, _3: String, id: Option[Long])
case class Account(_4: String, _5: String, _6: String, id: Option[Long])

val inDS = Seq(
  ("a1", "b1", "c1", "x1", "y1", "z1"), 
  ("a2", "b2", "c2", "x2", "y2", "z2"),
  ("a3", "b3", "c3", "x3", "y3", "z3")).toDS()

val offset = 1001 // load actual offset from db

val withSeqIdsDS = inDS.map(x => (Resource(x._1, x._2, x._3, None), Account(x._4, x._5, x._6, None)))
  .rdd.zipWithIndex // map index from 0 to n-1
  .map(x => (
    x._1._1.copy(id = Option(offset + x._2 * 2)),
    x._1._2.copy(id = Option(offset + x._2 * 2 + 1))
  )).toDS()

// save new offset to db

withSeqIdsDS.show()
+---------------+---------------+
|             _1|             _2|
+---------------+---------------+
|[a1,b1,c1,1001]|[x1,y1,z1,1002]|
|[a2,b2,c2,1003]|[x2,y2,z2,1004]|
|[a3,b3,c3,1005]|[x3,y3,z3,1006]|
+---------------+---------------+

withSeqIdsDS.select("_1.*", "_2.*").show
+---+---+---+----+---+---+---+----+
| _1| _2| _3|  id| _4| _5| _6|  id|
+---+---+---+----+---+---+---+----+
| a1| b1| c1|1001| x1| y1| z1|1002|
| a2| b2| c2|1003| x2| y2| z2|1004|
| a3| b3| c3|1005| x3| y3| z3|1006|
+---+---+---+----+---+---+---+----+

Upvotes: 1

Rajesh Giriyappa
Rajesh Giriyappa

Reputation: 41

thanks for the reply, I have tried this method which doesn’t give me me the correlation or surrogate primary key to search database.everytime I run the etl job indexes or numbers will be different for each record,if my dataset count changes. I need unique I’d to correlate with dB record which matches only one record and the should be same record anytime in dB.

Is there any good design patterns or practices to compare etl dataset row to dB record with unique I’d.

Upvotes: 1

Oli
Oli

Reputation: 10406

With dataframes, you can use the monotonicallyIncreasingId function that "generates monotonically increasing 64-bit integers" (https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.functions$). It can be used this way:

dataframe.withColumn("INDEX", functions.monotonicallyIncreasingId())

With RDDs, you can use zipWithIndex or zipWithUniqueId. The former generates a real index (ordered between 0 and N-1, N being the size of the RDD) while the latter generates unique long IDs, without further guarantees which seems to be what you need (https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.rdd.RDD). Note that zipWithUniqueId does not even trigger a spark job and is therefore almost free.

Upvotes: 1

Related Questions