Reputation: 41
We have a requirement in Spark where the every record coming from the feed is broken into set of entites.
Example {col1,col2,col3}=>Resource
, {Col4,col5,col6}=> Account
,{col7,col8}=>EntityX
etc.
Now I need a unique identifier generated in the ETL Layer which can be persisted to the database table respectively for each of the above mentioned tables/entities.
This Unique Identifier acts a lookup value to identify the each table records and generate sequence in the DB.
But the Second option is not well accepted by many people. What are the other options/solutions to Create a Unique Keys in the ETL layer which can looked back in the DB for comparison.
Thanks in Advance, Rajesh Giriayppa
Upvotes: 1
Views: 1472
Reputation: 66
This is a little late, but in case someone else is looking...
I ran into a similar requirement. As Oli mentioned previously, zipWithIndex will give sequential, zero-indexed id's, which you can then map onto an offset. Note, there is a critical section, so a locking mechanism could be required, depending on use case.
case class Resource(_1: String, _2: String, _3: String, id: Option[Long])
case class Account(_4: String, _5: String, _6: String, id: Option[Long])
val inDS = Seq(
("a1", "b1", "c1", "x1", "y1", "z1"),
("a2", "b2", "c2", "x2", "y2", "z2"),
("a3", "b3", "c3", "x3", "y3", "z3")).toDS()
val offset = 1001 // load actual offset from db
val withSeqIdsDS = inDS.map(x => (Resource(x._1, x._2, x._3, None), Account(x._4, x._5, x._6, None)))
.rdd.zipWithIndex // map index from 0 to n-1
.map(x => (
x._1._1.copy(id = Option(offset + x._2 * 2)),
x._1._2.copy(id = Option(offset + x._2 * 2 + 1))
)).toDS()
// save new offset to db
withSeqIdsDS.show()
+---------------+---------------+
| _1| _2|
+---------------+---------------+
|[a1,b1,c1,1001]|[x1,y1,z1,1002]|
|[a2,b2,c2,1003]|[x2,y2,z2,1004]|
|[a3,b3,c3,1005]|[x3,y3,z3,1006]|
+---------------+---------------+
withSeqIdsDS.select("_1.*", "_2.*").show
+---+---+---+----+---+---+---+----+
| _1| _2| _3| id| _4| _5| _6| id|
+---+---+---+----+---+---+---+----+
| a1| b1| c1|1001| x1| y1| z1|1002|
| a2| b2| c2|1003| x2| y2| z2|1004|
| a3| b3| c3|1005| x3| y3| z3|1006|
+---+---+---+----+---+---+---+----+
Upvotes: 1
Reputation: 41
thanks for the reply, I have tried this method which doesn’t give me me the correlation or surrogate primary key to search database.everytime I run the etl job indexes or numbers will be different for each record,if my dataset count changes. I need unique I’d to correlate with dB record which matches only one record and the should be same record anytime in dB.
Is there any good design patterns or practices to compare etl dataset row to dB record with unique I’d.
Upvotes: 1
Reputation: 10406
With dataframes, you can use the monotonicallyIncreasingId
function that "generates monotonically increasing 64-bit integers" (https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.functions$). It can be used this way:
dataframe.withColumn("INDEX", functions.monotonicallyIncreasingId())
With RDDs, you can use zipWithIndex
or zipWithUniqueId
. The former generates a real index (ordered between 0 and N-1, N being the size of the RDD) while the latter generates unique long IDs, without further guarantees which seems to be what you need (https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.rdd.RDD). Note that zipWithUniqueId
does not even trigger a spark job and is therefore almost free.
Upvotes: 1