Greencolor
Greencolor

Reputation: 695

How can I apply incremental data loading into azure databricks table from azue adls gen2

We have Azure Synapse Link for Dataverse which enables continuouse export data from Dataverse to Azure Data Lake Storage Gen2 (CSV format).

Main source of data is D365 CRM. All the files contains columns SinkModifiedOn, and IsDelete.

Current solution implies reading all the CSV files, using databricks python notebook and creating the dfs. Then converting these dfs as PARQUET files and laoding in another datalake.

Next step is to read the PARQUET files using the below code. So, it always refers to the full load from csv to parquet. and created tables are PARQUET foramts (not DELTA)

%sql

USE CATALOG ${personal.catalog};

CREATE SCHEMA IF NOT EXISTS ${personal.schema};

-- DROP TABLE IF EXISTS ${personal.schema}.${personal.source}_${personal.table};

CREATE TABLE IF NOT EXISTS ${personal.schema}.${personal.source}_${personal.table}
USING PARQUET
LOCATION 'abfss://${personal.schema}@${personal.storage_account_name}.dfs.core.windows.net/${personal.source}/${personal.table}'
OPTIONS(recursiveFileLookup = true);

ALTER TABLE ${personal.schema}.${personal.source}_${personal.table} SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5')

My goal:

I would like to achieve the incremental load. But Im not sure how to do so. My thinkig of way is to create the watermark table and identify the changes in the CSV files and create df out of them. Next, output those as PARQUET files as its in the current work flow and load them in another datalake. As of next steps Im not sure how to continue.

Any help or tips how can I achiave this will be appreciated

Upvotes: 0

Views: 1340

Answers (1)

I tried to reproduce and see if I can do direct incremental load on Parquet file from ADLS gen 2. Here is the Code and the error message while trying: enter image description here enter image description here

Also, as you creating the table in databricks, the table metadata, such as schema information, table name, and other metadata, is managed by the Databricks catalog, which is typically stored in a Hive Metastore.

I tried the incremental load and the watermark_value by finding the maximum SinkModifiedOn from the temp_incremental_movies DataFrame.

For example, I have a table called Movies, this table consist of 10 records and as part of Incremental load I have Used MERGE statement. The MERGE statement does the INCREMENTAL load. SQL Merge Operation Using Pyspark

In the New Updated Parquet file I have 2 Records Inserted, 1 row deleted, 1 Row updated. Full Load File:

enter image description here

To capture the changes the below is the code using MERGE statement:

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col
spark = SparkSession.builder.appName("Merge Parquet to Movies Table").getOrCreate()
parquet_file_path = 'abfss://[email protected]/Parquet_folder/increment_movies/part-00000-tid-6210101209940006132-a9d0ffbb-5cd4-426b-9811-4241b4724ad4-5-1-c000.snappy.parquet'
df1 = spark.read.parquet(parquet_file_path, header=True, inferSchema=True)
**Converting the target table to a Delta table**
target_table_name = "Movies"
delta_table_path = "abfss://[email protected]/delta_table"df1.write.format("delta").mode("overwrite").save(delta_table_path)
**watermark column**
watermark_column = "SinkModifiedOn"
watermark_threshold = "30 days"
**watermark value based on the maximum SinkModifiedOn from the temp_incremental_movies DataFrame**
watermark_value = df1.agg({"SinkModifiedOn": "max"}).collect()[0][0]
**MERGE statement to perform the UPSERT operation**
merge_statement = f"""
MERGE INTO delta.`{delta_table_path}` AS target
USING temp_incremental_movies AS source
ON target.movie_id = source.movie_id -- Condition to match records based on movie_id
WHEN MATCHED AND source.{watermark_column} >= target.{watermark_column} THEN UPDATE SET
target.movie_title = source.movie_title,
target.release_year = source.release_year,
target.director = source.director,
target.genre = source.genre,
target.SinkModifiedOn = source.SinkModifiedOn,
target.IsDelete = source.IsDelete
WHEN NOT MATCHED AND source.{watermark_column} >= '{watermark_value}' THEN INSERT *
"""
spark.sql(merge_statement)

enter image description here

In the Below image you can see the format for Delta and Parquet format both are saved as snappy. Parquet only.

enter image description here

enter image description here

Delta format is built upon the Parquet managing data more effectively in big data environments.

Delta is storing the data as parquet only, and it just has an additional layer in it with advanced features, like providing history of the events, (transaction log) and for the changing data like, update, delete and merge capabilities.

Know more about DeltaTables

Upvotes: 0

Related Questions