Reputation: 43
Have a simple program as shown below
import pyspark
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = builder.getOrCreate()
spark._jsc.hadoopConfiguration().set(
"fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark._jsc.hadoopConfiguration().set(
"fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
df = spark.read.format("delta").load(
"gs://org/delta/bronze/mongodb/registration/audits"
)
print(df.show())
This is packaged into a container using the below Dockerfile
FROM varunmallya/spark-pi:3.2.1
USER root
ADD gcs-connector-hadoop2-latest.jar $SPARK_HOME/jars
WORKDIR /app
COPY main.py .
This app is then deployed as a SparkApplication on k8s using the spark-on-k8s operator
I expected to see 20 rows of data but instead got this exception
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
However when I run this in local jupyter notebook I can see the desired. I have added the necessary package - io.delta:delta-core_2.12:1.2.0 via the crd and have also ensured the gcs-connector-hadoop2-latest.jar is made available.
What could the issue be?
Upvotes: 4
Views: 233
Reputation: 9691
Could you try the following Dockerfile
:
FROM datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
USER root
WORKDIR /app
COPY main.py .
And then try deploying the SparkApplication
:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: sparky-pi
namespace: spark
spec:
type: Python
mode: cluster
pythonVersion: "3"
image: <YOUR_IMAGE_GOES_HERE>:latest
mainApplicationFile: local:///app/main.py
sparkVersion: "3.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
executor:
serviceAccount: spark
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
I ran this on my Kubernetes cluster and was able to get:
I think here the base image datamechanics/spark:3.1.1-hadoop-3.2.0-java-8-scala-2.12-python-3.8-dm17
is key. Props to the folks who put it together!
Upvotes: 3