Daniel Brenner
Daniel Brenner

Reputation: 31

Databricks Error: Multiple sources found for iceberg - Uniform Iceberg Read & the Iceberg runtime jar

Using Azure Databricks in runtime 14.3 LTS and trying to use the hadoop catalog, for testing out an apache iceberg storage implementation I get an error when I try and write a table:

Multiple sources found for iceberg (com.databricks.sql.transaction.tahoe.uniform.sources.IcebergBrowseOnlyDataSource, org.apache.iceberg.spark.source.IcebergSource), please specify the fully qualified class name.

Is there a way to override databricks's uniform reader to allow for us to use the apache iceberg runtime exclusively on our cluster?

My spark cluster configurations are:

My spark config is:

Libraries installed on the cluster:

Simple code:

import logging
from pyspark.sql import SparkSession
from data_lake_library.config.config import AzureKeyVaultParams
from data_lake_library.config.secrets_manager import AzureKeyVaultSecrets

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def main():
    # Key Vault name and secret name for retrieving credentials
    keyvault_name = ""  # Replace with your actual Key Vault name 
    secret_name = ""  # Replace with your actual secret name in Key Vault

    # Initialize Azure Key Vault
    logger.info(f"Initializing Azure Key Vault with name: {keyvault_name}")
    akv_params = AzureKeyVaultParams(keyvault_name=keyvault_name)
    akv = AzureKeyVaultSecrets(params=akv_params)

    # Retrieve the SAS token from Key Vault
    logger.info(f"Retrieving secret from Azure Key Vault: {secret_name}")
    access_token = akv.get_secret(secret_name)
    logger.info("Successfully retrieved SAS token from Azure Key Vault")

    # Set ADLS Gen2 configurations (service principal, OAuth token, or account key)
    storage_account_name = "" # adls storage account name
    container_name = "" # container using to test

    # Initialize Spark session
    logger.info("Initializing Spark session.")
    spark = SparkSession.builder \
        .appName("Iceberg ADLS Gen2 Example") \
        .config(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", access_token) \
        .getOrCreate()

    logger.info("Spark session created successfully.")

    # Define the table path where the Iceberg table will be stored in ADLS Gen2
    table_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/my_database/my_iceberg_table"

    # Create a new database in ADLS Gen2 under the Hadoop-based Iceberg catalog
    logger.info("Creating database in ADLS Gen2 under the Hadoop-based Iceberg catalog.")
    spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.my_database")
    logger.info("Database created successfully (if not already present).")

    # Create an Iceberg table with an explicit path for storage in ADLS Gen2
    logger.info("Creating Iceberg table in ADLS Gen2.")
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS spark_catalog.my_database.my_iceberg_table (
            col1 INT,
            col2 STRING
        )
        USING iceberg
    """)
    logger.info(f"Iceberg table created successfully at location: {table_path}")

    # Stop the Spark session
    logger.info("Stopping Spark session.")
    spark.stop()

if __name__ == "__main__":
    main()

Full error:

Multiple sources found for iceberg (com.databricks.sql.transaction.tahoe.uniform.sources.IcebergBrowseOnlyDataSource, org.apache.iceberg.spark.source.IcebergSource), please specify the fully qualified class name. File , line 62 59 spark.stop() 61 if name == "main": ---> 62 main() File , line 48, in main() 46 # Create an Iceberg table with an explicit path for storage in ADLS Gen2 47 logger.info("Creating Iceberg table in ADLS Gen2.") ---> 48 spark.sql(f""" 49 CREATE TABLE IF NOT EXISTS spark_catalog.my_database.my_iceberg_table ( 50 col1 INT, 51 col2 STRING 52 ) 53 USING iceberg 54 """) 55 logger.info(f"Iceberg table created successfully at location: {table_path}") 57 # Stop the Spark session File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs) 45 start = time.perf_counter() 46 try: ---> 47 res = func(*args, **kwargs) 48 logger.log_success( 49 module_name, class_name, function_name, time.perf_counter() - start, signature 50 ) 51 return res File /databricks/spark/python/pyspark/sql/session.py:1748, in SparkSession.sql(self, sqlQuery, args, **kwargs) 1744
assert self._jvm is not None 1745 litArgs = self._jvm.PythonUtils.toArray( 1746
[_to_java_column(lit(v)) for v in (args or [])] 1747 ) -> 1748 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) 1749 finally: 1750 if len(kwargs) > 0: File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.call(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception..deco(*a, **kw) 226 converted = convert_exception(e.java_exception) 227 if not isinstance(converted, UnknownException): 228 # Hide where the exception came from that shows a non-Pythonic 229 # JVM exception message. --> 230 raise converted from None 231 else: 232 raise

Upvotes: 0

Views: 382

Answers (1)

JayashankarGS
JayashankarGS

Reputation: 8020

This error raised because there is 2 class found for the format iceberg com.databricks.sql.transaction.tahoe.uniform.sources.IcebergBrowseOnlyDataSource and org.apache.iceberg.spark.source.IcebergSource.

So, in databricks you use the Delta Lake Universal Format (UniForm) to create table with the iceberg source.

Code

storage_account_name = "jadls"
container_name = "data"


spark = SparkSession.builder.appName("Iceberg ADLS Gen2 Example").config(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", access_token).getOrCreate()

spark.conf.set("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkCatalog") 
spark.conf.set("spark.sql.catalog.spark_catalog.type","hadoop")
table_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/iceberg/warehouse1"
spark.conf.set("spark.sql.catalog.spark_catalog.warehouse",table_path)
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.my_database")

spark.sql("""
    CREATE TABLE spark_catalog.my_database.my_iceberg_table2  (id bigint, data string) 
    TBLPROPERTIES(
  'delta.enableIcebergCompatV2' = 'true',
  'delta.universalFormat.enabledFormats' = 'iceberg');
  """)

and describe the table.

enter image description here

and in storage account.

enter image description here

Upvotes: 0

Related Questions