Luiz Tauffer
Luiz Tauffer

Reputation: 632

Write to Iceberg/Glue table from local PySpark session

I want to be able to operate (read/write) to an Iceberg table hosted on AWS Glue, from my local machine, using Python.

I have already:

I can access (read-only) the remote Iceberg table from my local laptop using PyIceberg, and now I want to write data to it. The problem is that Athena imposes some strict limits on write operations, and at the end of the day I’d like to write to the Iceberg table using a dataframe-like interface from Python, and the only option seems to be PySpark for now.

So, I’m, trying to do it, running a PySpark cluster on my local laptop, using the configurations I found on those refs:

The setup code seems to run fine, with the prints very similar to the reference video:

from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
import pyspark
import os

conf = (
    pyspark.SparkConf()
        .setAppName('luiz-session')
        #packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,software.amazon.awssdk:bundle:2.20.18,software.amazon.awssdk:url-connection-client:2.20.18,org.apache.spark:spark-hadoop-cloud_2.12:3.2.0')
        #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        #Configuring Catalog
        .set('spark.sql.catalog.glue', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.glue.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog')
        .set('spark.sql.catalog.glue.warehouse', "s3://my-bucket/iceberg-data")
        .set('spark.sql.catalog.glue.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        #AWS CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', os.environ.get("AWS_ACCESS_KEY_ID"))
        .set('spark.hadoop.fs.s3a.secret.key', os.environ.get("AWS_SECRET_ACCESS_KEY"))
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

Now, when I try to run a query using this:

spark.sql("SELECT * FROM glue.iceberg_table LIMIT 10;").show()

I get the following error:

IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.aws.glue.GlueCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
    Missing org.apache.iceberg.aws.glue.GlueCatalog [java.lang.NoClassDefFoundError: software/amazon/awssdk/services/glue/model/InvalidInputException]

I’ve been trying to change the fix this by changing the conf and copying the Iceberg jar releases to the spark home folder, but no luck so far.

Upvotes: 5

Views: 8558

Answers (2)

MikeGM
MikeGM

Reputation: 1115

Here is a working example to run this locally to read and write to an Iceberg Table in Glue/on S3. I think the main issue I see in the question is that the jars are likely not loaded in the workers. This is achieved by setting PYSPARK_SUBMIT_ARGS. Also, in my example, I explicitly set the env variables for AWS.

import subprocess
from awsume.awsumepy import awsume
import os
from pyspark import SparkConf, __version__ as pyspark_version
from pyspark.sql import SparkSession

# In case you are using awsume, otherwise just input your keys
profile_session = awsume("your profile")

### Enable loading the needed jars into the pyspark session
### Note, this also bakes in the credentials to be able to access S3/Glue

aws_bundles = ["kms", "dynamodb", "glue", "sts", "s3", "url-connection-client"]
aws_version = "2.21.24"
aws_jars = ",".join(
    [f"software.amazon.awssdk:{pkg}:{aws_version}" for pkg in aws_bundles]
)

creds = profile_session.get_credentials().get_frozen_credentials()

main_pyspark_version = ".".join(pyspark_version.split(".")[:-1])
# Ensure the versions for pyspark match below
os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = f"--packages org.apache.iceberg:iceberg-spark-runtime-{main_pyspark_version}_2.12:1.4.1,{aws_jars},org.apache.spark:spark-hadoop-cloud_2.12:{pyspark_version} pyspark-shell"

# Not strictly necessary, but can be helpful to select a compatible java
# version in case you have multiple ones
os.environ["JAVA_HOME"] = (
    subprocess.check_output(["/usr/libexec/java_home", "-v", "11"]).strip().decode()
)
os.environ["AWS_ACCESS_KEY_ID"] = creds.access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = creds.secret_key
os.environ["AWS_SESSION_TOKEN"] = creds.token



def configure_iceberg(s3_warehouse: str, catalog_name: str) -> SparkConf:
    return (
        SparkConf()
        .set(
            f"spark.sql.catalog.{catalog_name}",
            "org.apache.iceberg.spark.SparkCatalog",
        )
        .set(
            f"spark.sql.catalog.{catalog_name}.catalog-impl",
            "org.apache.iceberg.aws.glue.GlueCatalog",
        )
        # Note, the warehouse will generally only be used for *new* databases
        .set(
            f"spark.sql.catalog.{catalog_name}.warehouse",
            s3_warehouse,
        )
        .set(
            f"spark.sql.catalog.{catalog_name}.io-impl",
            "org.apache.iceberg.aws.s3.S3FileIO",
        )
        .set(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        )
    )


def get_spark_session(
    app_name: str, additional_config: SparkConf = None
) -> SparkSession:
    builder = (
        SparkSession.builder.appName(app_name)
        .config(
            "hive.metastore.client.factory.class",
            "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
        )
        .config("spark.sql.session.timeZone", "UTC")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    )

    if additional_config:
        builder = builder.config(conf=additional_config)

    return builder.enableHiveSupport().getOrCreate()


sc = get_spark_session(
    "test",
    configure_iceberg("s3://...", "ice").set(
        "spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
    ),
)

Generate table (note, I have already created the temp_tables database on glue, meaning that the warehouse is actually used from that database.):

>>> sc.createDataFrame([(1, )], schema="x int").writeTo("ice.temp_tables.example_data_frame").createOrReplace()

Read table that was written:

>>> sc.table("ice.temp_tables.example_data_frame").show()
+---+
|  x|
+---+
|  1|
+---+

You can also verify that the files appear in S3. Note, as mentioned above, if the database was already created in Glue, it will use the location defined there as the warehouse.

Upvotes: 2

Luiz Tauffer
Luiz Tauffer

Reputation: 632

the only way I found to develop local with glue and iceberg at the end was using the amazon/aws-glue-libs:glue_libs_4.0.0_image_01 docker image with DATALAKE_FORMATS=iceberg, and removing the set packages from spark configuration.

Refs:

Upvotes: 3

Related Questions