Reputation: 404
We have our application using the on-premise CDP (Cloudera) cluster for submitting pyspark jobs. Version of spark is 2.x We are now exploring the option to have CDC datasets processed and merge with the base dataset. We found option of using delta-spark and also CDF feature from databricks. However on doing in depth analysis we find that CDF is possible on Databricks cluster and cannot be used on a CDP cluster.
Is there any other alternative to use CDF-like feature (i.e row level versioning) on Cloudera platform?
Also we did a proof of concept on using delta-spark (v 0.0.6) package when creating our Spark session on our CDP (yarn managed) cluster. We are able to write our dataframe in delta format into a Delta table (which is nothing but parquet files plus the delta log that maintains version at Delta table level). However what we noticed that when we try to merge our cdc dataset with source dataset using merge operation from delta-spark library, the new versioned dataset duplicates the records that were unchanged. This is an additional storage overhead.
Is there a way to avoid duplication of unchanged records?
Update1: I'm not sure that CDF feature is available on a CDP (Cloudera) platform. I tried enabling CDF option in two ways. First way as below when creating Spark session:
spark = SparkSession.builder.config('spark.deploy.mode','cluster').\
.config('spark.num.executors','4')\
.config('spark.executor.memory','15G')\
.config('spark.driver.memory','11G')\
.config('spark.queue','root.default')\
.config('spark.driver.port', '21261')\
.config('spark.driver.bindAddress', '0.0.0.0')\
.config('spark.driver.blockManager.port', '21263')\
.config('spark.driver.host', os.environ['AE_HOST_IP'])\
.config('spark.master','yarn')\
.config("spark.jars.packages", "io.delta:delta-core_2.11:0.6.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
**.config('spark.databricks.delta.properties.defaults.enableChangeDataFeed', True)** \
.appName('Test_applicaion').\
config('spark.yarn.stagingDir', 'hdfs:///testq/_pyspark_staging').\
config('spark.executorEnv.PYTHONPATH','pyspark.zip:py4j-0.10.7-src.zip')
.getOrCreate()
Other way I tried by enabling CDF on already created Spark session object via spark SQL as below:
spark.sql("set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true")
Both the ways, i do not get any error with Spark session and when I try to save my dataframe in delta format as below:
df.write.format("delta").mode("overwrite").save("/path/to/my/delta-table")
I would expect the _change_data folder created on the above delta table path as has been mentioned in the documentation.
However I do not see any such folder created.
Moreover, if i run a spark sql on table_changes
I get a no table found error.
spark.sql("select * from table_changes('/path/to/my/delta-table', 0, 1)").show()
spark.sql("select * from table_changes").show()
> Py4JJavaError: An error occurred while calling o98.sql. :
> org.apache.spark.sql.AnalysisException: could not resolve
> `table_changes` to a table-valued function; line 1 pos 14
However, as suggested, I'm now trying to use delt-spark 2.0.0 version on a Spark 3.x cluster and see if above CDF feature works with it.
Update2: Below is the merge code I'm trying out:
#ready day-1 data
day1_df = spark.read.option('header','true').parquet('/raw_data_path/input_dataset.parquet')
#write day1-data to delta format at a delta path
day1_df.write.format("delta").mode('overwrite').save("/delta_path/silver_table")
#create delta table on top of delta path
delta_table = DeltaTable.forPath(spark, "/delta_path/silver_table")
delta_df = delta_table.toDF()
If i check the files on above delta_path, I see below:
####### Delta table files on hdfs ##########
Found 2 items
drwxrwx---+ - test_nosh test_sg 0 2023-09-05 10:35 /delta_path/silver_table/_delta_log
-rw-rw----+ 3 test_nosh test_sg 653292 2023-09-05 10:35 /delta_path/silver_table/part-00000-0c78e42e-8c97-4a5b-a384-ce1bd2abc4ed-c000.snappy.parquet
Now I read my CDC dataset:
day2_df = spark.read.option('header','true').parquet('/raw_data_path/cdc_dataset.parquet')
And then merge this CDC dataset with day-1 dataset as below:
result = delta_table.alias('base').merge(day2_df.alias('update'), 'base.pk_1 = update.pk_1').whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Now if I see the files on the delta path, I see below:
!hdfs dfs -ls /bipq/test/ERI_AFS_LOC_ACCT_DEOD
Found 3 items
drwxrwx---+ - test_nosh test_sg 0 2023-09-05 10:36 /delta_path/silver_table/_delta_log
-rw-rw----+ 3 test_nosh test_sg 653292 2023-09-05 10:35 /delta_path/silver_table/part-00000-0c78e42e-8c97-4a5b-a384-ce1bd2abc4ed-c000.snappy.parquet
-rw-rw----+ 3 test_nosh test_sg 653298 2023-09-05 10:36 /delta_path/silver_table/part-00000-e4e6e178-cf37-45ed-9ca5-c446cff66bf3-c000.snappy.parquet
If we see here, delta-spark has created another parquet file with size greater than prior file. This is because via merge operation, it has brought it all the records that were unchanged since we did whenNotMatchedInsertAll
On other hand, if i avoid this option and just use whenMatchedUpdateAll
option, it will update the matched records and tag them with version 1. The unchanged records will still be tagged with version 0. This in-turn will mean that when I read from the delta table for latest version, I will get only the records that have been updated.
Pls le'me know if there is a way to overcome this.
Basically, I'm looking for something what CDF feature provides where it maintains version at record level. Something similar to what has been done in below sample notebook
https://docs.databricks.com/en/_extras/notebooks/source/delta/cdf-demo.html
Thanks in advance for any pointers !
Upvotes: 0
Views: 437
Reputation: 2043
AKAIF delta lake CDF feature was published since open source version delta lake v2.0.0 and it's not platform specified. If you don't want to integrate with Databrick cluster, you can use the open source version. The only thing to concern is the Spark version, since v2.0.0 only can be used when Spark version > 3.
Not sure how do you design your merging logic, maybe you need to provide your code, but you can use different condition block, for example whenMatchedUpdate
, whenNotMatchedInsert
, to control the record updating. For example, you can set update only when there is any difference with records share the same key. You can check the merge builder docs.
Edit 1:
For your 1st update, your configuration looks fine, I believe it's because of the versioning. As you mentioned that you won't use delt-spark 2.0.0 version on a Spark 3.x cluster, you can't enable the change data feed feature. See change data feed docs and compatibility.
Note
This feature is available in Delta Lake 2.0.0 and above. This feature is in experimental support mode.
For your 2nd update, it's actually related to two parts:
How delta lake write the file: You mentioned: However what we noticed that when we try to merge our cdc dataset with source dataset using merge operation from delta-spark library, the new versioned dataset duplicates the records that were unchanged. This is an additional storage overhead.
. Delta lake will bring the unchanged data to the new parquet. To be more accurate, it will rewrite the whole partition even there is only 1 record is updated inside the partition. When you want to look back to the historical version of the table, the table version in delta log will help to indicate that which parquet files should be read. This is how the copy-on-write (COW) works.
How to maintain row record version even without CDF feature: As _change_type
and _commit_version
are the metadata in row level that CDF feature bring, unfortunately, you may need to design your application logic. For example, you can read your table version by versionAsOf
and use Spark .exceptAll()
to find out the difference between two version.
Upvotes: 0