Reputation: 21
I'm trying to get Apache Iceberg set up in our Databricks environment and running into an error when executing a MERGE
statement in Spark SQL.
This code:
CREATE TABLE iceberg.db.table (id bigint, data string) USING iceberg;
INSERT INTO iceberg.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO iceberg.db.table SELECT id, data FROM (select * from iceberg.db.table) t WHERE length(data) = 1;
MERGE INTO iceberg.db.table t USING (SELECT * FROM iceberg.db.table) u ON t.id = u.id
WHEN NOT MATCHED THEN INSERT *
Generates this error:
Error in SQL statement: AnalysisException: MERGE destination only supports Delta sources.
Some(RelationV2[id#116L, data#117] iceberg.db.table
I believe the root of the issue is that MERGE
is also a keyword for the Delta Lake SQL engine. From what I can tell, this issue is stemming from the order in which Spark tries to execute the plan. MERGE
triggers the delta rule and then throws an error because it's not a delta table. I'm able to read, append, and overwrite to iceberg tables without issue.
Primary Question: How can I get Spark to recognize this as an Iceberg query and not Delta? Or is it possible to remove the delta-related SQL rules altogether?
Environment
Spark version: 3.0.1
Databricks runtime version: 7.6
Iceberg configs
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=hadoop
spark.sql.catalog.iceberg.warehouse=BLOB_STORAGE_CONTAINER
Stack trace:
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.AnalysisException: MERGE destination only supports Delta sources.
Some(RelationV2[id#116L, data#117] iceberg.db.table
);
at com.databricks.sql.transaction.tahoe.DeltaErrors$.notADeltaSourceException(DeltaErrors.scala:343)
at com.databricks.sql.transaction.tahoe.PreprocessTableMerge.apply(PreprocessTableMerge.scala:201)
at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge$$anonfun$apply$1.applyOrElse(PreprocessTableMergeEdge.scala:39)
at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge$$anonfun$apply$1.applyOrElse(PreprocessTableMergeEdge.scala:36)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:112)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:112)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:216)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:110)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge.apply(PreprocessTableMergeEdge.scala:36)
at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge.apply(PreprocessTableMergeEdge.scala:29)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:152)```
Upvotes: 2
Views: 3484
Reputation: 14939
Not exactly what you're looking for, but Databricks allows to convert an Iceberg table in-place (no data copying) into a Delta table --
https://docs.databricks.com/delta/delta-utility.html#convert-iceberg-to-delta
Requires DBR 10.4+
-- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.`<path-to-table>`
-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.`<path-to-table>` NO STATISTICS
Then run MERGE on the Delta table.
If Iceberg has the same Iceberg to Delta in-place upgrade (I am not sure), this would solve the original problem.
Upvotes: 0
Reputation: 371
Only INSERT operations are allowed for non-delta sources. DELETE and MERGE operations are not allowed.
Upvotes: 0
Reputation: 16576
I believe the error here is that Databricks always preempts other extensions added to the Spark Session. This means you cannot execute the Iceberg codepath and only the Databricks extensions will ever be used. I would ask your Databricks rep if there is a way to allow the Iceberg Extensions to be placed first or if they can consider allowing other Merge implementations.
Upvotes: 2