Thelin90
Thelin90

Reputation: 37

PySpark Delta Table - generate symlink [java.lang.NoSuchMethodError]

I have the current situation:

To do this, I need to follow the docs: instructions and s3 setup

I am using a MacBook Pro and with Environment variables configured in my ~/.zshrc for my small little POC:

export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1

I setup a small pyspark project, where I create my spark_session:

from pyspark.sql import SparkSession
import findspark
import boto3


def create_session() -> SparkSession:
    findspark.init()

    spark_session = SparkSession.builder.appName("delta_session") \
        .master("local[*]") \
        .getOrCreate()

    sparkContext = spark_session.sparkContext

    boto_default_session = boto3.setup_default_session()

    boto_session = boto3.Session(
        botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
    )
    credentials = boto_session.get_credentials()

    print(
        f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
    )

    hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
    hadoopConfiguration.set(
        "fs.s3a.aws.credentials.provider", 
        "com.amazonaws.auth.profile.ProfileCredentialsProvider"
    )
    hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
    hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
    hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

    return spark_session

I then run:

spark_session = create_session()

from delta.tables import *

delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")

# This works
df = delta_table.toDF()
print(df.show(10))

# This fails
delta_table.generate("symlink_format_manifest")
  1. I am able to retrieve the delta files and create a DataFrame, all looks good.

  2. I then try to call delta_table.generate and I get this error:

Traceback (most recent call last): File "/run.py", line 33, in delta_table.generate("symlink_format_manifest") File "/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 74, in generate File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call File "<poetry_python_path>/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o34.generate. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression$1(GenerateSymlinkManifest.scala:350) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest$1(GenerateSymlinkManifest.scala:180) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration$1(GenerateSymlinkManifest.scala:365) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$1$adapted(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) at io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) at io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) at io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) at io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

I call the application with:

    poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py

What I tried:

py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"

It looks to me like the org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default$6()Z is not callable for some reason. And I can't find anything more to install?

My pyproject.toml

[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]

[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"

[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"

Greatful for anyone potentially encountering the same issue.


UPDATE

Based on the comment from Alex, I resolved the issue by:

enter image description here

Upvotes: 2

Views: 2663

Answers (1)

Alex Ott
Alex Ott

Reputation: 87299

You need to downgrade Spark to Spark 3.0.2 to use Delta 0.8.0 - unfortunately, Spark 3.1.1 made many changes in the internal things that are used Delta under the hood, and this breaks the binary compatibility. Most probably, your specific problem is caused by SPARK-32154 that made changes in the parameters of the ScalaUDF (this line)

Upvotes: 3

Related Questions