Reputation: 421
Is there a way I can setup the flag column(I-inserted, U-updated), when I perform the delta merge logic. I'm curious to know how many records got inserted and how many records got updated in the daily delta merge logic.
My sample dataframe:
df_latest = spark.createDataFrame(
[
('Java', "20000"), # create your data here, be consistent in the types.
('Scala', '90000'),
('Python', '100000')
],
["language", "users_count"] # add your column names here
)
When i perfrom the below delta merge logic, I will need to a have one more column called flag with (I or U), describing on the version02 of the delta table how many rows got inserted and how rows got updated.
test_delta.alias("h")\
.merge(df_latest.alias("df"), "h.language = df.language")\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
Any help would be appreciated, couldn't figure this out myself..!!
Upvotes: 2
Views: 1013
Reputation: 36
I was looking for a similar technique and stumbled on your question. Luckily, I was able to solve it using the approach described below:
1-) Create a Delta Table and Define it's schema.
from delta import *
DeltaTable.createIfNotExists(spark) \
.addColumn("language",StringType())\
.addColumn("users_count",IntegerType())\
.addColumn("Flag",StringType())\
.property("description", "Testing Flag Logic") \
.location("/mnt/output/TestingFlagLogic") \
.execute()
A snapshot of the table can be seen on the link below Snapshot of the DataFrame
2-) Use The below Command For Insertion to the Table.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark,"/mnt/output/TestingFlagLogic")
deltaTable.alias("Destination")\
.merge(
df.alias("Updates"),
"Destination.language = Updates.language")\
.whenMatchedUpdate(set =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("U")
}) \
.whenNotMatchedInsert( values =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("I")
}) \
.execute()
3-) After the First Insertion, you would get the below DataFrame, It would have a Flag Column Populated with the value "I". Delta Table After First Insertion
4-) The You define a new DataFrame with the values which you would want to update.Here I am doubling the User Count of the languages "Python" and "C++".
df_updated = spark.createDataFrame(
[
('Python', '200000'),
('C++', '300000'),
],
["language", "users_count"] # add your column names here
)
Snapshot of the Dataframe which has the values to update
5-) Now Insert using the same logic described in step 2. Just change df with df_updated.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark,"/mnt/output/TestingFlagLogic")
deltaTable.alias("Destination")\
.merge(
df_updated.alias("Updates"),
"Destination.language = Updates.language")\
.whenMatchedUpdate(set =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("U")
}) \
.whenNotMatchedInsert( values =
{
"language": "Updates.language",
"users_count": "Updates.users_count",
"Flag": F.lit("I")
}) \
.execute()
6-) Congrats, you have successfully achieved the mentioned functionality. Now query your delta and display it for a visual verification.
df_new = spark.read.format("delta").load("/mnt/output/TestingFlagLogic")
display(df_new)
A snapshot of the delta table after update can be seen on the link below. Snapshot of the Updated Table
You can see that "Python" and "C++" have an updated user count value along with the "Flag" values as "U" as it was an Upsert(Update+Insert) operation.
Upvotes: 1
Reputation: 87144
If you just need metrics, then you can retrieve that information from the history of the table (either via DESCRIBE HISTORY SQL command, or via history function). Both of them are returning the dataframe that contains operation metrics (the operationMetrics
column that is a map), and for MERGE operation there are metrics that describe how many rows where inserted/updated/deleted: numTargetRowsInserted
, numTargetRowsUpdated
, numTargetRowsDeleted
.
Something like this (only for last version - if you need for all, then just remove 1
from history call):
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "mytable")
df = deltaTable.history(1)
f.select(df["operationMetrics"]["numTargetRowsInserted"],
df["operationMetrics"]["numTargetRowsUpdated"],
df["operationMetrics"]["numTargetRowsDeleted"])
Upvotes: 0