How to improve the performance of a merge operation with an incremental DeltaLake table?

I am specifically looking to optimize performance by updating and inserting data to a DeltaLake base table, with about 4 trillion records.

Environment:

In context this is about making an incremental table via DeltaLake, I'll summarize this in steps to be more detailed:

  1. Creation of the base table (delta)
  2. Obtaining periodic data
  3. Add the data to the base table

Steps 1 and 2 have already been done, but when adding the data the performance is notoriously slow, for example adding a 9GB CSV takes about 6 hours, this mainly because delta needs to rewrite the data for each update, it also needs "read" all data from the database.

This table is also partitioned (PARTITIONED BY) and stored in the cluster's GDFS (HDFS) to ensure that the spark nodes can perform the operations.

The fields of the base table:

Name Type Cardinality Comment
ID int 10000 Identifier
TYPE string 30
LOCAL_DATE date Local date of the record
DATE_UTC date UTC date of registration
VALUE int Registry value
YEAR int 4 Calculated column
MONTH int 12 Calculated column
DAY int 31 Calculated column

As the general search is by time, it was decided to partition by the LOCAL_DATE column in YEAR, MONTH, DAY, partitioning by the ID and LOCAL_DATE columns was ruled out due to its high level of cardinality, (which for performance purposes is worse), it was added finally TYPE, being as follows:

spark.sql(f"""
    CREATE OR REPLACE TABLE  {TABLE_NAME} (
        ID INT, 
        FECHA_LOCAL TIMESTAMP,
        FECHA_UTC TIMESTAMP,
        TIPO STRING, 
        VALUE DOUBLE,
        YEAR INT,
        MONTH INT, 
        DAY INT )
    USING DELTA
    PARTITIONED BY (YEAR , MONTH , DAY, TIPO)
    LOCATION '{location}'
""")

From now on, the incrementality is given by periodically adding these csv files of approximately 9GB every 5 days. Currently the MERGE operation is as follows:

spark.sql(f"""
    MERGE INTO {BASE_TABLE_NAME}
    USING {INCREMENTAL_TABLE_NAME} ON 
        --partitioned cols
        {BASE_TABLE_NAME}.YEAR        = {INCREMENTAL_TABLE_NAME}.YEAR AND
        {BASE_TABLE_NAME}.MONTH       = {INCREMENTAL_TABLE_NAME}.MONTH AND  
        {BASE_TABLE_NAME}.DAY         = {INCREMENTAL_TABLE_NAME}.DAY AND 
        {BASE_TABLE_NAME}.TIPO        = {INCREMENTAL_TABLE_NAME}.TIPO AND 
        {BASE_TABLE_NAME}.FECHA_LOCAL = {INCREMENTAL_TABLE_NAME}.FECHA_LOCAL AND 
        {BASE_TABLE_NAME}.ID          = {INCREMENTAL_TABLE_NAME}.ID
    WHEN MATCHED THEN UPDATE
        SET {BASE_TABLE_NAME}.VALUE = {INCREMENTAL_TABLE_NAME}.VALUE,
            {BASE_TABLE_NAME}.TIPO  = {INCREMENTAL_TABLE_NAME}.TIPO
    WHEN NOT MATCHED THEN INSERT *
    """)

Some facts to consider:

Spark app:

mode = 'spark://spark-master:7077'
# mode = 'local [*]'
spark = (
    SparkSession.builder
    .master(mode)
    .appName("SparkApp")
    .config('spark.cores.max', '45')
    .config('spark.executor.cores', '5')
    .config('spark.executor.memory', '11g')
    .config('spark.driver.memory', '120g')
    .config("spark.sql.shuffle.partitions", "200") # 200 only for 200GB delta table reads
    .config("spark.storage.memoryFraction", "0.8")
    # DeltaLake configs
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # Delta optimization
    .config("spark.databricks.delta.optimizeWrite.enabled", "true")
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
    .getOrCreate()
)

Upvotes: 2

Views: 8927

Answers (1)

Well I choose to share this answer so that you can take advantage of some tips.

Delta recommends using all partitioned columns, in this way the final data search is less, given by the effect of "pruning"

So it is necessary to identify all the cases where the merge can update the data, for this A query is made on the incremental data to generate a dictionary of this type:

filter_columns = spark.sql (f "" "
SELECT
    YEAR,
    MONTH,
    DAY,
    COLLECT_LIST (DISTINCT TYPE) AS TYPES
Incremental FROM
GROUP BY YEAR, MONTH, DAY
ORDER BY 1, 2, 3
"" ") .toPandas ()

With this df it is possible to generate the conditions where the merge must update / insert:

[! [df grouped by year, month, day, type] 1] 1

Then it generated a string called "final_cond" like this:

dic = filter_columns.groupby (['YEAR', 'MONTH', 'DAY']) ['TYPE']. apply (lambda grp: list (grp.value_counts (). index)). to_dict ()
final_cond = ''
index = 0
for key, value in dic.items ():
    cond = ''
    year = key [0]
    month = key [1]
    day = key [2]
    variables = ','. join (["'" + str (x) + "'" for x in value [0]])
    or_cond = '' if index + 1 == len (dic) else '\ nOR \ n'
    
    cond = f "" "({BASE_TABLE_NAME} .YEAR == {year} AND {BASE_TABLE_NAME} .MONTH == {month} AND {BASE_TABLE_NAME} .DAY == {day} AND {BASE_TABLE_NAME}. TYPE IN ({variables} )) "" "
      
    final_cond = final_cond + cond + f '{or_cond}'
    index + = 1
    #break
    
print (final_cond)

[! [string condition] 2]

Finally we add these conditions to the MERGE:

...
WHEN MATCHED AND ({final_cond}) THEN
...

This simple "filter" reduced the merge time for large operations

Upvotes: 1

Related Questions