Varun Mallya
Varun Mallya

Reputation: 43

ClassCast Exception Encountered When Trying To Connect To Delta Lake From Spark K8s Operator

Have a simple program as shown below

import pyspark

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = builder.getOrCreate()

spark._jsc.hadoopConfiguration().set(
    "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark._jsc.hadoopConfiguration().set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)

df = spark.read.format("delta").load(
    "gs://org/delta/bronze/mongodb/registration/audits"
)
print(df.show())

This is packaged into a container using the below Dockerfile

FROM varunmallya/spark-pi:3.2.1
USER root
ADD gcs-connector-hadoop2-latest.jar $SPARK_HOME/jars
WORKDIR /app
COPY main.py .

This app is then deployed as a SparkApplication on k8s using the spark-on-k8s operator

I expected to see 20 rows of data but instead got this exception

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

However when I run this in local jupyter notebook I can see the desired. I have added the necessary package - io.delta:delta-core_2.12:1.2.0 via the crd and have also ensured the gcs-connector-hadoop2-latest.jar is made available.

What could the issue be?

Upvotes: 4

Views: 233

Answers (1)

Benjamin Tan Wei Hao
Benjamin Tan Wei Hao

Reputation: 9691

Could you try the following Dockerfile:

FROM datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
USER root
WORKDIR /app
COPY main.py .

And then try deploying the SparkApplication:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: sparky-pi
  namespace: spark
spec:
  type: Python
  mode: cluster
  pythonVersion: "3"
  image: <YOUR_IMAGE_GOES_HERE>:latest
  mainApplicationFile: local:///app/main.py
  sparkVersion: "3.1.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
  executor:
    serviceAccount: spark
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1

I ran this on my Kubernetes cluster and was able to get:

I think here the base image datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17 is key. Props to the folks who put it together!

Source: https://towardsdatascience.com/optimized-docker-images-for-apache-spark-now-public-on-dockerhub-1f9f8fed1665

Upvotes: 3

Related Questions