Reputation: 482
I have been experiencing some (unexpected?) behavior where a catalog reference in Spark is not reflected in the Hive Metastore. I have followed the Spark configuration according to the documentation, which looks like it should create a new catalog with the respective name. Everything works as expected, except for that the catalog is NOT being inserted in the Hive Metastore. This has some implications which I will showcase using an example.
Here is sample script in PySpark:
import os
from pyspark.sql import SparkSession
deps = [
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
"org.apache.iceberg:iceberg-aws:1.2.1",
"software.amazon.awssdk:bundle:2.17.257",
"software.amazon.awssdk:url-connection-client:2.17.257"
]
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)} pyspark-shell"
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
os.environ["AWS_REGION"] = "eu-east-1"
catalog = "hive_catalog"
spark = SparkSession.\
builder.\
appName("Iceberg Reader").\
config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").\
config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog").\
config(f"spark.sql.catalog.{catalog}.type", "hive").\
config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083").\
config(f"spark.sql.catalog.{catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .\
config(f"spark.sql.catalog.{catalog}.s3.endpoint", "http://localhost:9000").\
config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse").\
config("hive.metastore.uris", "thrift://localhost:9083").\
enableHiveSupport().\
getOrCreate()
# Raises error
spark.sql("CREATE NAMESPACE wrong_catalog.new_db;")
# Correct creation of namespace
spark.sql(f"CREATE NAMESPACE {catalog}.new_db;")
# Create table
spark.sql(f"CREATE TABLE {catalog}.new_db.new_table (col1 INT, col2 STRING);")
# Insert data
spark.sql(f"INSERT INTO {catalog}.new_db.new_table VALUES (1, 'first'), (2, 'second');")
# Read data
spark.sql(f"SELECT * FROM {catalog}.new_db.new_table;").show()
#|col1| col2|
#+----+------+
#| 1| first|
#| 2|second|
#+----+------+
# Read metadata
spark.sql(f"SELECT * FROM {catalog}.new_db.new_table.files;").show()
#+-------+--------------------+-----------+-------+------------+------------------+------------------+----------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
#|content| file_path|file_format|spec_id|record_count|file_size_in_bytes| column_sizes| value_counts|null_value_counts|nan_value_counts| lower_bounds| upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id| readable_metrics|
#+-------+--------------------+-----------+-------+------------+------------------+------------------+----------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
#| 0|s3a://lakehouse/n...| PARQUET| 0| 1| 652|{1 -> 47, 2 -> 51}|{1 -> 1, 2 -> 1}| {1 -> 0, 2 -> 0}| {}|{1 -> ���, 2 -> ...|{1 -> ���, 2 -> ...| null| [4]| null| 0|{{47, 1, 0, null,...|
#| 0|s3a://lakehouse/n...| PARQUET| 0| 1| 660|{1 -> 47, 2 -> 53}|{1 -> 1, 2 -> 1}| {1 -> 0, 2 -> 0}| {}|{1 -> ���, 2 -> ...|{1 -> ���, 2 -> ...| null| [4]| null| 0|{{47, 1, 0, null,...|
#+-------+--------------------+-----------+-------+------------+------------------+------------------+----------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
Now, this seems all good. It created a namespace, table and inserted data in the table. Now, showing results from the Hive Metastore shows what is the problem (CTLGS
):
|CTLG_ID|NAME|DESC |LOCATION_URI |
|-------|----|------------------------|----------------|
|1 |hive|Default catalog for Hive|s3a://lakehouse/|
It does NOT insert a new catalog with the respective catalog name. We can see that the namespaces and tables actually have been inserted in the Hive Metastore though (DBS
and TBLS
):
|DB_ID|DESC |DB_LOCATION_URI |NAME |OWNER_NAME |OWNER_TYPE|CTLG_NAME|
|-----|---------------------|-------------------------|-------|--------------|----------|---------|
|1 |Default Hive database|s3a://lakehouse/ |default|public |ROLE |hive |
|2 | |s3a://lakehouse/new_db.db|new_db |thijsvandepoll|USER |hive |
|TBL_ID|CREATE_TIME |DB_ID|LAST_ACCESS_TIME|OWNER |OWNER_TYPE|RETENTION |SD_ID|TBL_NAME |TBL_TYPE |VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|IS_REWRITE_ENABLED|
|------|-------------|-----|----------------|--------------|----------|-------------|-----|---------|--------------|------------------|------------------|------------------|
|1 |1.683.707.647|2 |80.467 |thijsvandepoll|USER |2.147.483.647|1 |new_table|EXTERNAL_TABLE| | |0 |
This means that it uses the Hive default catalog instead of the provided name. I am not exactly sure if this is expected behavior or unexpected behavior. Everything else works fine up to now. However, the problem exists when we want to create another the same namespace but in another catalog:
import os
from pyspark.sql import SparkSession
deps = [
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
"org.apache.iceberg:iceberg-aws:1.2.1",
"software.amazon.awssdk:bundle:2.17.257",
"software.amazon.awssdk:url-connection-client:2.17.257"
]
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)} pyspark-shell"
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
os.environ["AWS_REGION"] = "eu-east-1"
catalog = "other_catalog"
spark = SparkSession.\
builder.\
appName("Iceberg Reader").\
config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").\
config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog").\
config(f"spark.sql.catalog.{catalog}.type", "hive").\
config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083").\
config(f"spark.sql.catalog.{catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .\
config(f"spark.sql.catalog.{catalog}.s3.endpoint", "http://localhost:9000").\
config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse").\
config("hive.metastore.uris", "thrift://localhost:9083").\
enableHiveSupport().\
getOrCreate()
# Error that catalog already exists
spark.sql(f"CREATE NAMESPACE {catalog}.new_db;")
# pyspark.sql.utils.AnalysisException: Namespace 'new_db' already exists
# Create another namespace
spark.sql(f"CREATE NAMESPACE {catalog}.other_db;")
# Try to access data from other catalog using current catalog
spark.sql("SELECT * FROM {catalog}.new_db.new_table;").show()
#|col1| col2|
#+----+------+
#| 1| first|
#| 2|second|
#+----+------+
Now we can see that even though we are referencing another catalog, it still uses the Hive default catalog implicitly. We can see that by viewing DBS
in the Hive Metastore:
|DB_ID|DESC |DB_LOCATION_URI |NAME |OWNER_NAME |OWNER_TYPE|CTLG_NAME|
|-----|---------------------|---------------------------|--------|--------------|----------|---------|
|1 |Default Hive database|s3a://lakehouse/ |default |public |ROLE |hive |
|2 | |s3a://lakehouse/new_db.db |new_db |thijsvandepoll|USER |hive |
|3 | |s3a://lakehouse/other_db.db|other_db|thijsvandepoll|USER |hive |
Basically this means that Iceberg together with the Hive Metastore does not have a notion of a catalog. It is just a list of namespaces + tables which can be defined. It is actually a single catalog so it seems.
Can anyone help me understand what is going on? Do I miss configurations? Is this expected behavior or a bug? Thanks in advance!
Upvotes: 3
Views: 1664