Metadata
Metadata

Reputation: 2083

How to parallelly merge data into partitions of databricks delta table using PySpark/Spark streaming?

I have a PySpark streaming pipeline which reads data from a Kafka topic, data undergoes thru various transformations and finally gets merged into a databricks delta table. In the beginning we were loading data into the delta table by using the merge function as given below.

This incoming dataframe inc_df had data for all partitions.

merge into main_db.main_delta_table main_dt USING inc_df df ON 
    main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND 
    main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND 
    main_.rule_read_start=df.rule_read_start AND 
    main_.company = df.company 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

We were executing the above query on table level.

I have given a very basic diagram of the process in the image below. enter image description here

But my delta table is partitioned on continent and year. For example, this is how my partitioned delta table looks like. enter image description here

So I tried implementing the merge on partition level and tried to run merge activity on multiple partitions parallelly. i.e. I have created seperate pipelines with the filters in queries on partition levels. Image can be seen below.

merge into main_db.main_delta_table main_dt USING inc_df df ON 
    main_dt.continent in ('AFRICA') AND main_dt.year in (‘202301’) AND 
    main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND 
    main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND 
    main_.rule_read_start=df.rule_read_start AND 
    main_.company = df.company 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

But I am seeing an error with concurrency. - com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [continent=AFRICA, year=2021] by a concurrent update. Please try the operation again.

I understand that the error is telling me that it cannot update files concurrently. But I have huge volume of data in production and I don't want to perform merge on table level where there are almost 1billion records without proper filters.

Trial2: As an alternate approach,

  1. I saved my incremental dataframe in an S3 bucket (like a staging dir) and end my streaming pipeline there.
  2. Then I have a seperate PySpark job that reads data from that S3 staging dir and performs merge into my main delta table, once again on partition level (I have specified partitions in those jobs as filters)

But I am facing the same exception/error there as well.

Could anyone let me know how can I design and optimise my streaming pipeline to merge data into delta table on partition level by having multiple jobs parallelly (jobs running on indivdual partitions)

Trial3:

I also made another attempt in a different approach as mentioned in the link and ConcurrentAppendException section from that page.

base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
    source=final_incremental_df.alias("df"), 
    condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='Africa'")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()

and

base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
    source=final_incremental_df.alias("df"), 
    condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='ASIA'")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()

I ran the above merge operations in two separate pipelines. But I am still facing the same issue.

Upvotes: 1

Views: 1391

Answers (1)

Robert Kossendey
Robert Kossendey

Reputation: 7028

In your trial 3, you need to change the merge condition. Instead of

condition="main_dt.continent=df.continent AND [...]"

it should be

condition="main_dt.continent='Africa' AND [...]"

You should also delete the continent='Africa' from the end of the condition.

Here is the documentation for reference.

Upvotes: 0

Related Questions