Reputation: 712
I am specifically looking to optimize performance by updating and inserting data to a DeltaLake base table, with about 4 trillion records.
Environment:
In context this is about making an incremental table via DeltaLake, I'll summarize this in steps to be more detailed:
Steps 1 and 2 have already been done, but when adding the data the performance is notoriously slow, for example adding a 9GB CSV takes about 6 hours, this mainly because delta needs to rewrite the data for each update, it also needs "read" all data from the database.
This table is also partitioned (PARTITIONED BY
) and stored in the cluster's GDFS (HDFS) to ensure that the spark nodes can perform the operations.
The fields of the base table:
Name | Type | Cardinality | Comment |
---|---|---|---|
ID | int |
10000 | Identifier |
TYPE | string |
30 | |
LOCAL_DATE | date |
Local date of the record | |
DATE_UTC | date |
UTC date of registration | |
VALUE | int |
Registry value | |
YEAR | int |
4 | Calculated column |
MONTH | int |
12 | Calculated column |
DAY | int |
31 | Calculated column |
As the general search is by time, it was decided to partition by the LOCAL_DATE
column in YEAR
, MONTH
, DAY
, partitioning by the ID and LOCAL_DATE
columns was ruled out due to its high level of cardinality, (which for performance purposes is worse), it was added finally TYPE
, being as follows:
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
ID INT,
FECHA_LOCAL TIMESTAMP,
FECHA_UTC TIMESTAMP,
TIPO STRING,
VALUE DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, TIPO)
LOCATION '{location}'
""")
From now on, the incrementality is given by periodically adding these csv files of approximately 9GB every 5 days. Currently the MERGE
operation is as follows:
spark.sql(f"""
MERGE INTO {BASE_TABLE_NAME}
USING {INCREMENTAL_TABLE_NAME} ON
--partitioned cols
{BASE_TABLE_NAME}.YEAR = {INCREMENTAL_TABLE_NAME}.YEAR AND
{BASE_TABLE_NAME}.MONTH = {INCREMENTAL_TABLE_NAME}.MONTH AND
{BASE_TABLE_NAME}.DAY = {INCREMENTAL_TABLE_NAME}.DAY AND
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO AND
{BASE_TABLE_NAME}.FECHA_LOCAL = {INCREMENTAL_TABLE_NAME}.FECHA_LOCAL AND
{BASE_TABLE_NAME}.ID = {INCREMENTAL_TABLE_NAME}.ID
WHEN MATCHED THEN UPDATE
SET {BASE_TABLE_NAME}.VALUE = {INCREMENTAL_TABLE_NAME}.VALUE,
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO
WHEN NOT MATCHED THEN INSERT *
""")
Some facts to consider:
Spark app:
mode = 'spark://spark-master:7077'
# mode = 'local [*]'
spark = (
SparkSession.builder
.master(mode)
.appName("SparkApp")
.config('spark.cores.max', '45')
.config('spark.executor.cores', '5')
.config('spark.executor.memory', '11g')
.config('spark.driver.memory', '120g')
.config("spark.sql.shuffle.partitions", "200") # 200 only for 200GB delta table reads
.config("spark.storage.memoryFraction", "0.8")
# DeltaLake configs
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Delta optimization
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.getOrCreate()
)
Upvotes: 2
Views: 8927
Reputation: 712
Well I choose to share this answer so that you can take advantage of some tips.
Delta recommends using all partitioned columns, in this way the final data search is less, given by the effect of "pruning"
So it is necessary to identify all the cases where the merge can update the data, for this A query is made on the incremental data to generate a dictionary of this type:
filter_columns = spark.sql (f "" "
SELECT
YEAR,
MONTH,
DAY,
COLLECT_LIST (DISTINCT TYPE) AS TYPES
Incremental FROM
GROUP BY YEAR, MONTH, DAY
ORDER BY 1, 2, 3
"" ") .toPandas ()
With this df it is possible to generate the conditions where the merge must update / insert:
[! [df grouped by year, month, day, type] 1] 1
Then it generated a string called "final_cond" like this:
dic = filter_columns.groupby (['YEAR', 'MONTH', 'DAY']) ['TYPE']. apply (lambda grp: list (grp.value_counts (). index)). to_dict ()
final_cond = ''
index = 0
for key, value in dic.items ():
cond = ''
year = key [0]
month = key [1]
day = key [2]
variables = ','. join (["'" + str (x) + "'" for x in value [0]])
or_cond = '' if index + 1 == len (dic) else '\ nOR \ n'
cond = f "" "({BASE_TABLE_NAME} .YEAR == {year} AND {BASE_TABLE_NAME} .MONTH == {month} AND {BASE_TABLE_NAME} .DAY == {day} AND {BASE_TABLE_NAME}. TYPE IN ({variables} )) "" "
final_cond = final_cond + cond + f '{or_cond}'
index + = 1
#break
print (final_cond)
[! [string condition] 2]
Finally we add these conditions to the MERGE:
...
WHEN MATCHED AND ({final_cond}) THEN
...
This simple "filter" reduced the merge time for large operations
Upvotes: 1