vijayinani
vijayinani

Reputation: 2634

Get the columns and its values after comparing two Spark Dataframes where the values are different or rows/columns are new

I have two Spark Dataframe as below.

DF1 (previous day snapshot):

The primary key is id and is unique. The rest of the columns can have duplicate or unique values, does not matter.

id name pincode landmark city
1 Vijay1 411021 Zoo1 Pune1
2 Vijay2 411022 Zoo2 null
3 Vijay3 411023 Zoo3 Pune3
4 Vijay4 null Zoo4 Pune4
5 Vijay5 411025 Zoo5 Pune5

DF2 (new delta):

The primary key is id and is unique. The rest of the columns can have duplicate or unique values, does not matter.

id name pincode landmark city petname
1 Vijay1 411021 Zoo1 Pune1 null
2 Vijay2_New null Zoo2 Pune2_New null
3 Vijay3 411023 Zoo3 Pune3 VJ3
4 Vijay4 411024_New Zoo4 Pune4 null
5 Vijay5 411025_New Zoo5 Pune5_New null
6 Vijay6 411026 null Pune6 VJ6

If you observe carefully:

a. New column(s) can be added in DF2, here it is petname.
b. New row(s) can be inserted in DF2, here row with id=6 is inserted.
c. Existing column values can be updated in DF2, here there are many columns whose values are updated with the same id. Some are changed from null to other values and vice versa as compared to DF1.

I need help with the Spark code snippet which will give me the differences in the columns and their values as below. With also need the current date_time_column combination.

OutputDF:

id date_time_column column old_value new_value operation_type
2 20220423_205226516_name name Vijay2 Vijay2_New update
2 20220423_205226516_pincode pincode 411022 null update
2 20220423_205226516_city city null Pune2_New update
3 20220423_205226516_petname petname null VJ3 update
4 20220423_205226516_pincode pincode null 411024_New update
5 20220423_205226516_pincode pincode 411025 411025_New update
5 20220423_205226516_city city Pune5 Pune5_New update
6 20220423_205226516_name name null Vijay6 insert
6 20220423_205226516_pincode pincode null 411026 insert
6 20220423_205226516_city city null Pune6 insert
6 20220423_205226516_petname petname null VJ6 insert

Basically, I am trying to fetch all the columns and their old and new values that are different in 2 Spark Dataframes. One Dataframe is the previous day's data snapshot and the other is the current day's delta. There can be new rows inserted too as well as new columns can also be added which are handled by schema evolution. In the case of new rows inserts, only columns with non-null values are added to the final output Dataframe.

Once found, I am going to write the final data frame in DynamoDB for change audit purposes. The id will be the partition key and date_time_column will be sort key in Dynamo DB.

Hope the question is clear. Let me know if any additional Info. is required. Thanks for the help in advance.

Update 1:

Below is the code that I have written to get the new rows/insert part.

//Read Snapshot
val df1 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/snapshot.csv")

//Read Delta
val df2 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/delta.csv")

//Find new columns in Delta
val newColumnsInDf2 = df2.schema.fieldNames.diff(df1.schema.fieldNames)

//Add new columns from Snapshot to Delta
val df1WithNewColumnsFromDf2 = newColumnsInDf2.foldLeft(df1)((df1,currCol) => df1.withColumn(currCol, lit(null)))

//Find new/inserted rows from Delta
val newInsertsDF = df2.as("df2Table").join(df1WithNewColumnsFromDf2.as("df1WithNewColumnsFromDf2Table"), $"df1WithNewColumnsFromDf2Table.id" === $"df2Table.id","LEFT_ANTI")

//Convert new/inserted row in desired format
val skipColumn = "id"
var columnCount = newInsertsDF.schema.size - 1
var columnsStr = ""
var counter = 0
for ( col <- newInsertsDF.columns ) {
    counter = counter + 1
    if(col != skipColumn) {
        if(counter == newInsertsDF.schema.size) {
        columnsStr = columnsStr + s"'$col', $col"    
        }
        else {
        columnsStr = columnsStr + s"'$col', $col,"
        }
    }
}
val newInsertsUnpivotedDF = newInsertsDF.select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)")).filter($"new_value".isNotNull).withColumn("operation_type", lit("insert")).withColumn("old_value", lit(null)).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).select("id","date_time_column","column","old_value","new_value","operation_type")

Update 2: I was able to solve this problem using the code below. Posting this as an update and also as an answer. Let me know how we can further optimize this.

//Read Snapshot
val df1 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/snapshot.csv")

//Read Delta
val df2 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/delta.csv")

//Find new columns in Delta
val newColumnsInDf2 = df2.schema.fieldNames.diff(df1.schema.fieldNames)

//Add new columns from Snapshot to Delta
val df1WithNewColumnsFromDf2 = newColumnsInDf2.foldLeft(df1)((df1,currCol) => df1.withColumn(currCol, lit(null)))

//Find new/inserted rows from Delta
val newInsertsDF = df2.as("df2Table").join(df1WithNewColumnsFromDf2.as("df1WithNewColumnsFromDf2Table"), $"df1WithNewColumnsFromDf2Table.id" === $"df2Table.id","LEFT_ANTI")

//Convert new/inserted row in desired format
val skipColumn = "id"
var columnCount = newInsertsDF.schema.size - 1
var columnsStr = ""
var counter = 0
for ( col <- newInsertsDF.columns ) {
    counter = counter + 1
    if(col != skipColumn) {
        if(counter == newInsertsDF.schema.size) {
        columnsStr = columnsStr + s"'$col', $col"    
        }
        else {
        columnsStr = columnsStr + s"'$col', $col,"
        }
    }
}
val newInsertsUnpivotedDF = newInsertsDF.select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)")).filter($"new_value".isNotNull).withColumn("operation_type", lit("insert")).withColumn("old_value", lit(null)).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).select("id","date_time_column","column","old_value","new_value","operation_type")

//Find updated rows in Delta
val updatesInDf1Unpivoted = df1WithNewColumnsFromDf2.except(df2).select($"id", expr(s"stack($columnCount, $columnsStr) as (column_old, old_value)")).withColumnRenamed("id", "id_old")
val updatesInDf2Unpivoted = df2.except(df1WithNewColumnsFromDf2).except(newInsertsDF).select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)"))
val df1MinusDf2 = updatesInDf1Unpivoted.except(updatesInDf2Unpivoted)
val df2MinusDf1 = updatesInDf2Unpivoted.except(updatesInDf1Unpivoted)
val joinedUpdatesDF = df1MinusDf2.join(df2MinusDf1, df1MinusDf2("id_old") === df2MinusDf1("id") && df1MinusDf2("column_old") === df2MinusDf1("column")).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).withColumn("operation_type", lit("update")).select("id","date_time_column","column","old_value","new_value","operation_type")

//Final output DF after combining Inserts and Updates
val finalOutputDF = newInsertsUnpivotedDF.union(joinedUpdatesDF)

//To display the results
finalOutputDF.show()

Upvotes: 1

Views: 857

Answers (1)

vijayinani
vijayinani

Reputation: 2634

I was able to solve the same using below code:

//Read Snapshot
val df1 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/snapshot.csv")

//Read Delta
val df2 = spark.read.option("header","true").csv("file:///home/notroot/lab/data/delta.csv")

//Find new columns in Delta
val newColumnsInDf2 = df2.schema.fieldNames.diff(df1.schema.fieldNames)

//Add new columns from Snapshot to Delta
val df1WithNewColumnsFromDf2 = newColumnsInDf2.foldLeft(df1)((df1,currCol) => df1.withColumn(currCol, lit(null)))

//Find new/inserted rows from Delta
val newInsertsDF = df2.as("df2Table").join(df1WithNewColumnsFromDf2.as("df1WithNewColumnsFromDf2Table"), $"df1WithNewColumnsFromDf2Table.id" === $"df2Table.id","LEFT_ANTI")

//Convert new/inserted row in desired format
val skipColumn = "id"
var columnCount = newInsertsDF.schema.size - 1
var columnsStr = ""
var counter = 0
for ( col <- newInsertsDF.columns ) {
    counter = counter + 1
    if(col != skipColumn) {
        if(counter == newInsertsDF.schema.size) {
        columnsStr = columnsStr + s"'$col', $col"    
        }
        else {
        columnsStr = columnsStr + s"'$col', $col,"
        }
    }
}
val newInsertsUnpivotedDF = newInsertsDF.select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)")).filter($"new_value".isNotNull).withColumn("operation_type", lit("insert")).withColumn("old_value", lit(null)).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).select("id","date_time_column","column","old_value","new_value","operation_type")

//Find updated rows in Delta
val updatesInDf1Unpivoted = df1WithNewColumnsFromDf2.except(df2).select($"id", expr(s"stack($columnCount, $columnsStr) as (column_old, old_value)")).withColumnRenamed("id", "id_old")
val updatesInDf2Unpivoted = df2.except(df1WithNewColumnsFromDf2).except(newInsertsDF).select($"id", expr(s"stack($columnCount, $columnsStr) as (column, new_value)"))
val df1MinusDf2 = updatesInDf1Unpivoted.except(updatesInDf2Unpivoted)
val df2MinusDf1 = updatesInDf2Unpivoted.except(updatesInDf1Unpivoted)
val joinedUpdatesDF = df1MinusDf2.join(df2MinusDf1, df1MinusDf2("id_old") === df2MinusDf1("id") && df1MinusDf2("column_old") === df2MinusDf1("column")).withColumn("date_time", date_format(current_timestamp(),"yyyyMMdd_HHmmssSSS")).withColumn("date_time_column", concat(col("date_time"),lit("_"),col("column"))).withColumn("operation_type", lit("update")).select("id","date_time_column","column","old_value","new_value","operation_type")

//Final output DF after combining Inserts and Updates
val finalOutputDF = newInsertsUnpivotedDF.union(joinedUpdatesDF)

//To display the results
finalOutputDF.show(false)

snapshot.csv

id,name,pincode,landmark,city
1,Vijay1,411021,Zoo1,Pune1
2,Vijay2,411022,Zoo2,
3,Vijay3,411023,Zoo3,Pune3
4,Vijay4,,Zoo4,Pune4
5,Vijay5,411025,Zoo5,Pune5

delta.csv

id,name,pincode,landmark,city,petname
1,Vijay1,411021,Zoo1,Pune1,
2,Vijay2_New,,Zoo2,Pune2_New,
3,Vijay3,411023,Zoo3,Pune3,VJ3
4,Vijay4,411024_New,Zoo4,Pune4,
5,Vijay5,411025_New,Zoo5,Pune5_New,
6,Vijay6,411026,,Pune6,VJ6

Result as below:

+---+--------------------------+-------+---------+----------+--------------+
|id |date_time_column          |column |old_value|new_value |operation_type|
+---+--------------------------+-------+---------+----------+--------------+
|6  |20220423_210923191_name   |name   |null     |Vijay6    |insert        |
|6  |20220423_210923191_pincode|pincode|null     |411026    |insert        |
|6  |20220423_210923191_city   |city   |null     |Pune6     |insert        |
|6  |20220423_210923191_petname|petname|null     |VJ6       |insert        |
|3  |20220423_210923191_petname|petname|null     |VJ3       |update        |
|4  |20220423_210923191_pincode|pincode|null     |411024_New|update        |
|2  |20220423_210923191_name   |name   |Vijay2   |Vijay2_New|update        |
|2  |20220423_210923191_city   |city   |null     |Pune2_New |update        |
|5  |20220423_210923191_pincode|pincode|411025   |411025_New|update        |
|5  |20220423_210923191_city   |city   |Pune5    |Pune5_New |update        |
|2  |20220423_210923191_pincode|pincode|411022   |null      |update        |
+---+--------------------------+-------+---------+----------+--------------+

Upvotes: 2

Related Questions