Reputation: 2634
I have two Spark Dataframe as below.
DF1 (previous day snapshot):
The primary key is id
and is unique. The rest of the columns can have duplicate or unique values, does not matter.
id | name | pincode | landmark | city |
---|---|---|---|---|
1 | Vijay1 | 411021 | Zoo1 | Pune1 |
2 | Vijay2 | 411022 | Zoo2 | null |
3 | Vijay3 | 411023 | Zoo3 | Pune3 |
4 | Vijay4 | null |
Zoo4 | Pune4 |
5 | Vijay5 | 411025 | Zoo5 | Pune5 |
DF2 (new delta):
The primary key is id
and is unique. The rest of the columns can have duplicate or unique values, does not matter.
id | name | pincode | landmark | city | petname |
---|---|---|---|---|---|
1 | Vijay1 | 411021 | Zoo1 | Pune1 | null |
2 | Vijay2_New | null |
Zoo2 | Pune2_New | null |
3 | Vijay3 | 411023 | Zoo3 | Pune3 | VJ3 |
4 | Vijay4 | 411024_New | Zoo4 | Pune4 | null |
5 | Vijay5 | 411025_New | Zoo5 | Pune5_New | null |
6 | Vijay6 | 411026 | null |
Pune6 | VJ6 |
If you observe carefully:
a. New column(s) can be added in DF2, here it is petname
.
b. New row(s) can be inserted in DF2, here row with id=6
is inserted.
c. Existing column values can be updated in DF2, here there are many columns whose values are updated with the same id
. Some are changed from null
to other values and vice versa as compared to DF1.
I need help with the Spark code snippet which will give me the differences in the columns and their values as below. With also need the current date_time_column combination.
OutputDF:
id | date_time_column | column | old_value | new_value | operation_type |
---|---|---|---|---|---|
2 | 20220423_205226516_name | name | Vijay2 | Vijay2_New | update |
2 | 20220423_205226516_pincode | pincode | 411022 | null |
update |
2 | 20220423_205226516_city | city | null |
Pune2_New | update |
3 | 20220423_205226516_petname | petname | null |
VJ3 | update |
4 | 20220423_205226516_pincode | pincode | null |
411024_New | update |
5 | 20220423_205226516_pincode | pincode | 411025 | 411025_New | update |
5 | 20220423_205226516_city | city | Pune5 | Pune5_New | update |
6 | 20220423_205226516_name | name | null |
Vijay6 | insert |
6 | 20220423_205226516_pincode | pincode | null |
411026 | insert |
6 | 20220423_205226516_city | city | null |
Pune6 | insert |
6 | 20220423_205226516_petname | petname | null |
VJ6 | insert |
Basically, I am trying to fetch all the columns and their old and new values that are different in 2 Spark Dataframes. One Dataframe is the previous day's data snapshot and the other is the current day's delta. There can be new rows inserted too as well as new columns can also be added which are handled by schema evolution. In the case of new rows inserts, only columns with non-null values are added to the final output Dataframe.
Once found, I am going to write the final data frame in DynamoDB for change audit purposes. The id
will be the partition key
and date_time_column
will be sort key
in Dynamo DB
.
Hope the question is clear. Let me know if any additional Info. is required. Thanks for the help in advance.
Update 1:
Below is the code that I have written to get the new rows/insert part.
//Read Snapshot
val df1 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/snapshot.csv")
//Read Delta
val df2 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/delta.csv")
//Find new columns in Delta
val newColumnsInDf2 = df2.schema.fieldNames.diff(df1.schema.fieldNames)
//Add new columns from Snapshot to Delta
val df1WithNewColumnsFromDf2 = newColumnsInDf2.foldLeft(df1)((df1,currCol) => df1.withColumn(currCol, lit(null)))
//Find new/inserted rows from Delta
val newInsertsDF = df2.as("df2Table").join(df1WithNewColumnsFromDf2.as("df1WithNewColumnsFromDf2Table"), $"df1WithNewColumnsFromDf2Table.id" === $"df2Table.id","LEFT_ANTI")
//Convert new/inserted row in desired format
val skipColumn = "id"
var columnCount = newInsertsDF.schema.size - 1
var columnsStr = ""
var counter = 0
for ( col <- newInsertsDF.columns ) {
counter = counter + 1
if(col != skipColumn) {
if(counter == newInsertsDF.schema.size) {
columnsStr = columnsStr + s"'$col', $col"
}
else {
columnsStr = columnsStr + s"'$col', $col,"
}
}
}
val newInsertsUnpivotedDF = newInsertsDF.select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)")).filter($"new_value".isNotNull).withColumn("operation_type", lit("insert")).withColumn("old_value", lit(null)).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).select("id","date_time_column","column","old_value","new_value","operation_type")
Update 2: I was able to solve this problem using the code below. Posting this as an update and also as an answer. Let me know how we can further optimize this.
//Read Snapshot
val df1 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/snapshot.csv")
//Read Delta
val df2 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/delta.csv")
//Find new columns in Delta
val newColumnsInDf2 = df2.schema.fieldNames.diff(df1.schema.fieldNames)
//Add new columns from Snapshot to Delta
val df1WithNewColumnsFromDf2 = newColumnsInDf2.foldLeft(df1)((df1,currCol) => df1.withColumn(currCol, lit(null)))
//Find new/inserted rows from Delta
val newInsertsDF = df2.as("df2Table").join(df1WithNewColumnsFromDf2.as("df1WithNewColumnsFromDf2Table"), $"df1WithNewColumnsFromDf2Table.id" === $"df2Table.id","LEFT_ANTI")
//Convert new/inserted row in desired format
val skipColumn = "id"
var columnCount = newInsertsDF.schema.size - 1
var columnsStr = ""
var counter = 0
for ( col <- newInsertsDF.columns ) {
counter = counter + 1
if(col != skipColumn) {
if(counter == newInsertsDF.schema.size) {
columnsStr = columnsStr + s"'$col', $col"
}
else {
columnsStr = columnsStr + s"'$col', $col,"
}
}
}
val newInsertsUnpivotedDF = newInsertsDF.select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)")).filter($"new_value".isNotNull).withColumn("operation_type", lit("insert")).withColumn("old_value", lit(null)).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).select("id","date_time_column","column","old_value","new_value","operation_type")
//Find updated rows in Delta
val updatesInDf1Unpivoted = df1WithNewColumnsFromDf2.except(df2).select($"id", expr(s"stack($columnCount, $columnsStr) as (column_old, old_value)")).withColumnRenamed("id", "id_old")
val updatesInDf2Unpivoted = df2.except(df1WithNewColumnsFromDf2).except(newInsertsDF).select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)"))
val df1MinusDf2 = updatesInDf1Unpivoted.except(updatesInDf2Unpivoted)
val df2MinusDf1 = updatesInDf2Unpivoted.except(updatesInDf1Unpivoted)
val joinedUpdatesDF = df1MinusDf2.join(df2MinusDf1, df1MinusDf2("id_old") === df2MinusDf1("id") && df1MinusDf2("column_old") === df2MinusDf1("column")).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).withColumn("operation_type", lit("update")).select("id","date_time_column","column","old_value","new_value","operation_type")
//Final output DF after combining Inserts and Updates
val finalOutputDF = newInsertsUnpivotedDF.union(joinedUpdatesDF)
//To display the results
finalOutputDF.show()
Upvotes: 1
Views: 857
Reputation: 2634
I was able to solve the same using below code:
//Read Snapshot
val df1 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/snapshot.csv")
//Read Delta
val df2 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/delta.csv")
//Find new columns in Delta
val newColumnsInDf2 = df2.schema.fieldNames.diff(df1.schema.fieldNames)
//Add new columns from Snapshot to Delta
val df1WithNewColumnsFromDf2 = newColumnsInDf2.foldLeft(df1)((df1,currCol) => df1.withColumn(currCol, lit(null)))
//Find new/inserted rows from Delta
val newInsertsDF = df2.as("df2Table").join(df1WithNewColumnsFromDf2.as("df1WithNewColumnsFromDf2Table"), $"df1WithNewColumnsFromDf2Table.id" === $"df2Table.id","LEFT_ANTI")
//Convert new/inserted row in desired format
val skipColumn = "id"
var columnCount = newInsertsDF.schema.size - 1
var columnsStr = ""
var counter = 0
for ( col <- newInsertsDF.columns ) {
counter = counter + 1
if(col != skipColumn) {
if(counter == newInsertsDF.schema.size) {
columnsStr = columnsStr + s"'$col', $col"
}
else {
columnsStr = columnsStr + s"'$col', $col,"
}
}
}
val newInsertsUnpivotedDF = newInsertsDF.select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)")).filter($"new_value".isNotNull).withColumn("operation_type", lit("insert")).withColumn("old_value", lit(null)).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).select("id","date_time_column","column","old_value","new_value","operation_type")
//Find updated rows in Delta
val updatesInDf1Unpivoted = df1WithNewColumnsFromDf2.except(df2).select($"id", expr(s"stack($columnCount, $columnsStr) as (column_old, old_value)")).withColumnRenamed("id", "id_old")
val updatesInDf2Unpivoted = df2.except(df1WithNewColumnsFromDf2).except(newInsertsDF).select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)"))
val df1MinusDf2 = updatesInDf1Unpivoted.except(updatesInDf2Unpivoted)
val df2MinusDf1 = updatesInDf2Unpivoted.except(updatesInDf1Unpivoted)
val joinedUpdatesDF = df1MinusDf2.join(df2MinusDf1, df1MinusDf2("id_old") === df2MinusDf1("id") && df1MinusDf2("column_old") === df2MinusDf1("column")).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).withColumn("operation_type", lit("update")).select("id","date_time_column","column","old_value","new_value","operation_type")
//Final output DF after combining Inserts and Updates
val finalOutputDF = newInsertsUnpivotedDF.union(joinedUpdatesDF)
//To display the results
finalOutputDF.show(false)
snapshot.csv
id,name,pincode,landmark,city
1,Vijay1,411021,Zoo1,Pune1
2,Vijay2,411022,Zoo2,
3,Vijay3,411023,Zoo3,Pune3
4,Vijay4,,Zoo4,Pune4
5,Vijay5,411025,Zoo5,Pune5
delta.csv
id,name,pincode,landmark,city,petname
1,Vijay1,411021,Zoo1,Pune1,
2,Vijay2_New,,Zoo2,Pune2_New,
3,Vijay3,411023,Zoo3,Pune3,VJ3
4,Vijay4,411024_New,Zoo4,Pune4,
5,Vijay5,411025_New,Zoo5,Pune5_New,
6,Vijay6,411026,,Pune6,VJ6
Result as below:
+---+--------------------------+-------+---------+----------+--------------+
|id |date_time_column |column |old_value|new_value |operation_type|
+---+--------------------------+-------+---------+----------+--------------+
|6 |20220423_210923191_name |name |null |Vijay6 |insert |
|6 |20220423_210923191_pincode|pincode|null |411026 |insert |
|6 |20220423_210923191_city |city |null |Pune6 |insert |
|6 |20220423_210923191_petname|petname|null |VJ6 |insert |
|3 |20220423_210923191_petname|petname|null |VJ3 |update |
|4 |20220423_210923191_pincode|pincode|null |411024_New|update |
|2 |20220423_210923191_name |name |Vijay2 |Vijay2_New|update |
|2 |20220423_210923191_city |city |null |Pune2_New |update |
|5 |20220423_210923191_pincode|pincode|411025 |411025_New|update |
|5 |20220423_210923191_city |city |Pune5 |Pune5_New |update |
|2 |20220423_210923191_pincode|pincode|411022 |null |update |
+---+--------------------------+-------+---------+----------+--------------+
Upvotes: 2