Reputation: 9724
In current on Prem etl we stage incremental data (based on date modified, held in a control table), and UPSERT, that is - update existing records in the final table and insert new records into the final table.
I'm trying to understand what is the approach to do this in a Azure scenario.
For example: say based on a control table I fetch the latest modified data and place it into the bronze layer. Now how do I incrementally load this (UPSERT) into the silver layer?
Note: I'm using Azure data lake gen2, the bronze layer format is parquet, the silver layer format is delta.
Upvotes: 0
Views: 267
Reputation: 3250
Here is how you can create the control table with an initial last_processed_date using spark notebooks:
control_table_path = 'abfss://[email protected]/control_table'
initial_data = [Row(last_processed_date="2023-09-01")]
control_df = spark.createDataFrame(initial_data)
control_df.write.format("delta").mode("overwrite").save(control_table_path)
print("Control table created with initial data.")
control_df = spark.read.format("delta").load(control_table_path)
last_processed_date = control_df.collect()[0]["last_processed_date"]
print(f"Last processed date: {last_processed_date}")
Results:
Last processed date: 2023-09-03
As you mentioned you want to implement the Incremental load using (UPSERT) into the silver layer?
I have tried the below approach:
silver_table = DeltaTable.forPath(spark, delta_table_path)
silver_table.alias("silver").merge(
incremental_df.alias("bronze"),
"silver.id = bronze.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
result_df = spark.read.format("delta").load(delta_table_path)
result_df.show()
In the above code loading the Silver Delta table
Performing the UPSERT
using MERGE INTO
command using the matching condition on id column
Results:
+---+------+---+-------------+
| id| name|age|last_modified|
+---+------+---+-------------+
| 2|Thomas| 29| 2023-08-26|
| 1| Jerry| 24| 2023-08-25|
+---+------+---+-------------+
Upvotes: 0