Reputation: 31
Using Azure Databricks in runtime 14.3 LTS and trying to use the hadoop catalog, for testing out an apache iceberg storage implementation I get an error when I try and write a table:
Multiple sources found for iceberg (com.databricks.sql.transaction.tahoe.uniform.sources.IcebergBrowseOnlyDataSource, org.apache.iceberg.spark.source.IcebergSource), please specify the fully qualified class name.
Is there a way to override databricks's uniform reader to allow for us to use the apache iceberg runtime exclusively on our cluster?
My spark cluster configurations are:
My spark config is:
Libraries installed on the cluster:
com.microsoft.azure:azure-storage:8.6.6
dbfs:/FileStore/jars/bf401b66_66f7_46c8_9964_b6ad07ad8099/data_lake_library-0.2.1.dev59+unknown-py3-none-any.whl (custom team built wheel)
numpy==1.26.4
org.apache.hadoop:hadoop-azure:3.4.0
org.apache.hadoop:hadoop-client-runtime:3.4.0
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
pandas==2.2.2
pyarrow==7.0.0
Simple code:
import logging
from pyspark.sql import SparkSession
from data_lake_library.config.config import AzureKeyVaultParams
from data_lake_library.config.secrets_manager import AzureKeyVaultSecrets
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def main():
# Key Vault name and secret name for retrieving credentials
keyvault_name = "" # Replace with your actual Key Vault name
secret_name = "" # Replace with your actual secret name in Key Vault
# Initialize Azure Key Vault
logger.info(f"Initializing Azure Key Vault with name: {keyvault_name}")
akv_params = AzureKeyVaultParams(keyvault_name=keyvault_name)
akv = AzureKeyVaultSecrets(params=akv_params)
# Retrieve the SAS token from Key Vault
logger.info(f"Retrieving secret from Azure Key Vault: {secret_name}")
access_token = akv.get_secret(secret_name)
logger.info("Successfully retrieved SAS token from Azure Key Vault")
# Set ADLS Gen2 configurations (service principal, OAuth token, or account key)
storage_account_name = "" # adls storage account name
container_name = "" # container using to test
# Initialize Spark session
logger.info("Initializing Spark session.")
spark = SparkSession.builder \
.appName("Iceberg ADLS Gen2 Example") \
.config(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", access_token) \
.getOrCreate()
logger.info("Spark session created successfully.")
# Define the table path where the Iceberg table will be stored in ADLS Gen2
table_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/my_database/my_iceberg_table"
# Create a new database in ADLS Gen2 under the Hadoop-based Iceberg catalog
logger.info("Creating database in ADLS Gen2 under the Hadoop-based Iceberg catalog.")
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.my_database")
logger.info("Database created successfully (if not already present).")
# Create an Iceberg table with an explicit path for storage in ADLS Gen2
logger.info("Creating Iceberg table in ADLS Gen2.")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS spark_catalog.my_database.my_iceberg_table (
col1 INT,
col2 STRING
)
USING iceberg
""")
logger.info(f"Iceberg table created successfully at location: {table_path}")
# Stop the Spark session
logger.info("Stopping Spark session.")
spark.stop()
if __name__ == "__main__":
main()
Full error:
Multiple sources found for iceberg (com.databricks.sql.transaction.tahoe.uniform.sources.IcebergBrowseOnlyDataSource, org.apache.iceberg.spark.source.IcebergSource), please specify the fully qualified class name. File , line 62 59 spark.stop() 61 if name == "main": ---> 62 main() File , line 48, in main() 46 # Create an Iceberg table with an explicit path for storage in ADLS Gen2 47 logger.info("Creating Iceberg table in ADLS Gen2.") ---> 48 spark.sql(f""" 49 CREATE TABLE IF NOT EXISTS spark_catalog.my_database.my_iceberg_table ( 50 col1 INT, 51 col2 STRING 52 ) 53 USING iceberg 54 """) 55 logger.info(f"Iceberg table created successfully at location: {table_path}") 57 # Stop the Spark session File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs) 45 start = time.perf_counter() 46 try: ---> 47 res = func(*args, **kwargs) 48 logger.log_success( 49 module_name, class_name, function_name, time.perf_counter() - start, signature 50 ) 51 return res File /databricks/spark/python/pyspark/sql/session.py:1748, in SparkSession.sql(self, sqlQuery, args, **kwargs) 1744
assert self._jvm is not None 1745 litArgs = self._jvm.PythonUtils.toArray( 1746
[_to_java_column(lit(v)) for v in (args or [])] 1747 ) -> 1748 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) 1749 finally: 1750 if len(kwargs) > 0: File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.call(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception..deco(*a, **kw) 226 converted = convert_exception(e.java_exception) 227 if not isinstance(converted, UnknownException): 228 # Hide where the exception came from that shows a non-Pythonic 229 # JVM exception message. --> 230 raise converted from None 231 else: 232 raise
Upvotes: 0
Views: 382
Reputation: 8020
This error raised because there is 2 class found for the format iceberg
com.databricks.sql.transaction.tahoe.uniform.sources.IcebergBrowseOnlyDataSource
and org.apache.iceberg.spark.source.IcebergSource
.
So, in databricks you use the Delta Lake Universal Format (UniForm) to create table with the iceberg source.
Code
storage_account_name = "jadls"
container_name = "data"
spark = SparkSession.builder.appName("Iceberg ADLS Gen2 Example").config(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", access_token).getOrCreate()
spark.conf.set("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.spark_catalog.type","hadoop")
table_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/iceberg/warehouse1"
spark.conf.set("spark.sql.catalog.spark_catalog.warehouse",table_path)
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.my_database")
spark.sql("""
CREATE TABLE spark_catalog.my_database.my_iceberg_table2 (id bigint, data string)
TBLPROPERTIES(
'delta.enableIcebergCompatV2' = 'true',
'delta.universalFormat.enabledFormats' = 'iceberg');
""")
and describe the table.
and in storage account.
Upvotes: 0