Aaron Kub
Aaron Kub

Reputation: 21

How to execute a Spark SQL merge statement on an Iceberg table in Databricks?

I'm trying to get Apache Iceberg set up in our Databricks environment and running into an error when executing a MERGE statement in Spark SQL.

This code:

CREATE TABLE iceberg.db.table (id bigint, data string) USING iceberg;

INSERT INTO iceberg.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');

INSERT INTO iceberg.db.table SELECT id, data FROM (select * from iceberg.db.table) t WHERE length(data) = 1;

MERGE INTO iceberg.db.table t USING (SELECT * FROM iceberg.db.table) u ON t.id = u.id
WHEN NOT MATCHED THEN INSERT *

Generates this error:

Error in SQL statement: AnalysisException: MERGE destination only supports Delta sources.
Some(RelationV2[id#116L, data#117] iceberg.db.table

I believe the root of the issue is that MERGE is also a keyword for the Delta Lake SQL engine. From what I can tell, this issue is stemming from the order in which Spark tries to execute the plan. MERGE triggers the delta rule and then throws an error because it's not a delta table. I'm able to read, append, and overwrite to iceberg tables without issue.

Primary Question: How can I get Spark to recognize this as an Iceberg query and not Delta? Or is it possible to remove the delta-related SQL rules altogether?

Environment

Spark version: 3.0.1

Databricks runtime version: 7.6

Iceberg configs

spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=hadoop
spark.sql.catalog.iceberg.warehouse=BLOB_STORAGE_CONTAINER

Stack trace:

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.AnalysisException: MERGE destination only supports Delta sources.
Some(RelationV2[id#116L, data#117] iceberg.db.table
);
    at com.databricks.sql.transaction.tahoe.DeltaErrors$.notADeltaSourceException(DeltaErrors.scala:343)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMerge.apply(PreprocessTableMerge.scala:201)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge$$anonfun$apply$1.applyOrElse(PreprocessTableMergeEdge.scala:39)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge$$anonfun$apply$1.applyOrElse(PreprocessTableMergeEdge.scala:36)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:112)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:112)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:216)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:110)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:108)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge.apply(PreprocessTableMergeEdge.scala:36)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge.apply(PreprocessTableMergeEdge.scala:29)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:152)```

Upvotes: 2

Views: 3484

Answers (3)

Tagar
Tagar

Reputation: 14939

Not exactly what you're looking for, but Databricks allows to convert an Iceberg table in-place (no data copying) into a Delta table --

https://docs.databricks.com/delta/delta-utility.html#convert-iceberg-to-delta

Requires DBR 10.4+

-- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.`<path-to-table>`

-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.`<path-to-table>` NO STATISTICS

Then run MERGE on the Delta table.

If Iceberg has the same Iceberg to Delta in-place upgrade (I am not sure), this would solve the original problem.

Upvotes: 0

halfwind22
halfwind22

Reputation: 371

Only INSERT operations are allowed for non-delta sources. DELETE and MERGE operations are not allowed.

Upvotes: 0

RussS
RussS

Reputation: 16576

I believe the error here is that Databricks always preempts other extensions added to the Spark Session. This means you cannot execute the Iceberg codepath and only the Databricks extensions will ever be used. I would ask your Databricks rep if there is a way to allow the Iceberg Extensions to be placed first or if they can consider allowing other Merge implementations.

Upvotes: 2

Related Questions