Reputation: 695
We have Azure Synapse Link for Dataverse which enables continuouse export data from Dataverse to Azure Data Lake Storage Gen2 (CSV format).
Main source of data is D365 CRM. All the files contains columns SinkModifiedOn
, and IsDelete
.
Current solution implies reading all the CSV files, using databricks python notebook and creating the df
s. Then converting these df
s as PARQUET files and laoding in another datalake.
Next step is to read the PARQUET files using the below code. So, it always refers to the full load from csv to parquet. and created tables are PARQUET foramts (not DELTA
)
%sql
USE CATALOG ${personal.catalog};
CREATE SCHEMA IF NOT EXISTS ${personal.schema};
-- DROP TABLE IF EXISTS ${personal.schema}.${personal.source}_${personal.table};
CREATE TABLE IF NOT EXISTS ${personal.schema}.${personal.source}_${personal.table}
USING PARQUET
LOCATION 'abfss://${personal.schema}@${personal.storage_account_name}.dfs.core.windows.net/${personal.source}/${personal.table}'
OPTIONS(recursiveFileLookup = true);
ALTER TABLE ${personal.schema}.${personal.source}_${personal.table} SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')
My goal:
I would like to achieve the incremental load. But Im not sure how to do so. My thinkig of way is to create the watermark table
and identify the changes in the CSV files and create df
out of them. Next, output those as PARQUET files as its in the current work flow and load them in another datalake. As of next steps Im not sure how to continue.
Any help or tips how can I achiave this will be appreciated
Upvotes: 0
Views: 1340
Reputation: 3250
I tried to reproduce and see if I can do direct incremental load on Parquet file from ADLS gen 2.
Here is the Code and the error message while trying:
Also, as you creating the table in databricks, the table metadata, such as schema information, table name, and other metadata, is managed by the Databricks catalog, which is typically stored in a Hive Metastore.
I tried the incremental load and the watermark_value by finding the maximum SinkModifiedOn from the temp_incremental_movies DataFrame.
For example, I have a table called Movies, this table consist of 10 records and as part of Incremental load I have Used MERGE statement. The MERGE statement does the INCREMENTAL load. SQL Merge Operation Using Pyspark
In the New Updated Parquet file I have 2 Records Inserted, 1 row deleted, 1 Row updated. Full Load File:
To capture the changes the below is the code using MERGE statement:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col
spark = SparkSession.builder.appName("Merge Parquet to Movies Table").getOrCreate()
parquet_file_path = 'abfss://[email protected]/Parquet_folder/increment_movies/part-00000-tid-6210101209940006132-a9d0ffbb-5cd4-426b-9811-4241b4724ad4-5-1-c000.snappy.parquet'
df1 = spark.read.parquet(parquet_file_path, header=True, inferSchema=True)
**Converting the target table to a Delta table**
target_table_name = "Movies"
delta_table_path = "abfss://[email protected]/delta_table"df1.write.format("delta").mode("overwrite").save(delta_table_path)
**watermark column**
watermark_column = "SinkModifiedOn"
watermark_threshold = "30 days"
**watermark value based on the maximum SinkModifiedOn from the temp_incremental_movies DataFrame**
watermark_value = df1.agg({"SinkModifiedOn": "max"}).collect()[0][0]
**MERGE statement to perform the UPSERT operation**
merge_statement = f"""
MERGE INTO delta.`{delta_table_path}` AS target
USING temp_incremental_movies AS source
ON target.movie_id = source.movie_id -- Condition to match records based on movie_id
WHEN MATCHED AND source.{watermark_column} >= target.{watermark_column} THEN UPDATE SET
target.movie_title = source.movie_title,
target.release_year = source.release_year,
target.director = source.director,
target.genre = source.genre,
target.SinkModifiedOn = source.SinkModifiedOn,
target.IsDelete = source.IsDelete
WHEN NOT MATCHED AND source.{watermark_column} >= '{watermark_value}' THEN INSERT *
"""
spark.sql(merge_statement)
In the Below image you can see the format for Delta and Parquet format both are saved as snappy. Parquet only.
Delta format is built upon the Parquet managing data more effectively in big data environments.
Delta is storing the data as parquet only, and it just has an additional layer in it with advanced features, like providing history of the events, (transaction log) and for the changing data like, update, delete and merge capabilities.
Know more about DeltaTables
Upvotes: 0