Immelstrom
Immelstrom

Reputation: 1

Read/write Nebula Graph in pySpark

I'm trying to read-write Nebula Graph from within pySpark and cannot figure out why it does not connect.

I'm working with Spark locally on my laptop and using onETL library to connect to datasources apart from Nebula.

I use 3.5.2 version of pySpark:

PS D:\work\PyCharmProjects\edt-test-spark-sandbox> spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/

Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user ubuntu on 2024-08-06T11:36:15Z
Revision bb7846dd487f259994fdc69e18e03382e3f64f42
Url https://github.com/apache/spark
Type --help for more information.

Java version is 11:

PS D:\work\PyCharmProjects\edt-test-spark-sandbox> java --version
openjdk 11.0.2 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

According to documentation (https://github.com/vesoft-inc/nebula-spark-connector) I must use 3.6.0 spark connector and spark common. I downloaded:

I have also downloaded scala-library-2.12.18.jar.

Here is a code fragment of how I initialize spark session:

    def _initialize_spark_session(self):
        # Initialize new SparkSession with Postgres driver loaded
        maven_packages = Postgres.get_packages()
        nebula_jars = [
            "file:///d:/work/PyCharmProjects/edt-test-spark-sandbox/nebula-spark-connector/nebula-spark-common-3.6.0.jar",
            "file:///d:/work/PyCharmProjects/edt-test-spark-sandbox/nebula-spark-connector/nebula-spark-connector-3.6.0.jar",
            "file:///d:/work/PyCharmProjects/edt-test-spark-sandbox/nebula-spark-connector/scala-library-2.12.18.jar"
        ]
        spark = (
            SparkSession.builder.appName("edt_spark_transformation")
            .config("spark.jars", ",".join(nebula_jars))
            .config("spark.jars.packages", ",".join(maven_packages))
            .config("spark.executor.extraClassPath", ",".join(nebula_jars))
            .config("spark.driver.extraClassPath", ",".join(nebula_jars))
            .enableHiveSupport()  # for Hive
            .getOrCreate()
        )
        return spark

I use a sample code like this to interact with Nebula:

df = spark.read.format(
  "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "vertex").option(
    "spaceName", "basketballplayer").option(
    "label", "player").option(
    "returnCols", "name,age").option(
    "metaAddress", "metad0:9559").option(
    "partitionNumber", 1).load()

And then I try to run spark application like this:

spark-submit --packages org.postgresql:postgresql:42.2.19 --jars file:///d:/work/PyCharmProjects/edt-test-spark-sandbox/nebula-spark-connector/nebula-spark-common-3.6.0.jar,file:///d:/work/PyCharmProjects/edt-test-spark-sandbox/nebula-spark-connector/nebula-spark-connector-3.6.0.jar,file:///d:/work/PyCharmProjects/edt-test-spark-sandbox/nebula-spark-connector/scala-library-2.12.18.jar product.py

But I get the following errors:

Traceback (most recent call last):
  File "D:\work\PyCharmProjects\edt-test-spark-sandbox\product.py", line 689, in <module>
    df = connector.spark.read.format(
  File "d:\work\soft\Python\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 314, in load
  File "d:\work\soft\Python\lib\site-packages\pyspark\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
  File "d:\work\soft\Python\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py", line 179, in deco
  File "d:\work\soft\Python\lib\site-packages\pyspark\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2139.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
        at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
        at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
        at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
        at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$3(DataSource.scala:633)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        ... 28 more

24/09/02 11:59:58 INFO SparkContext: Invoking stop() from shutdown hook
24/09/02 11:59:58 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/09/02 11:59:58 INFO SparkUI: Stopped Spark web UI at http://172.18.69.9:4040
24/09/02 11:59:58 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/09/02 11:59:58 INFO MemoryStore: MemoryStore cleared
24/09/02 11:59:58 INFO BlockManager: BlockManager stopped
24/09/02 11:59:58 INFO BlockManagerMaster: BlockManagerMaster stopped
24/09/02 11:59:58 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/09/02 11:59:58 WARN SparkEnv: Exception while deleting Spark temp dir: C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\userFiles-bd0bc2fb-ae10-406e-97b0-2dc58c0d1d10
java.io.IOException: Failed to delete: C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\userFiles-bd0bc2fb-ae10-406e-97b0-2dc58c0d1d10\nebula-spark-connector-3.6.0.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:147)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90)
        at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
        at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126)
        at org.apache.spark.SparkEnv.stop(SparkEnv.scala:108)
        at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2305)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2305)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2211)
        at org.apache.spark.SparkContext.$anonfun$new$34(SparkContext.scala:681)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
24/09/02 11:59:58 INFO SparkContext: Successfully stopped SparkContext
24/09/02 11:59:58 INFO ShutdownHookManager: Shutdown hook called
24/09/02 11:59:58 INFO ShutdownHookManager: Deleting directory C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\pyspark-c60ed4f0-89eb-43e1-8fa1-34c6fda0bf70
24/09/02 11:59:58 INFO ShutdownHookManager: Deleting directory C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-04bffbbb-3ce0-4b25-8055-667f8b9d45e0
24/09/02 11:59:58 INFO ShutdownHookManager: Deleting directory C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a
24/09/02 11:59:58 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a
java.io.IOException: Failed to delete: C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\userFiles-bd0bc2fb-ae10-406e-97b0-2dc58c0d1d10\nebula-spark-connector-3.6.0.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:147)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
        at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
24/09/02 11:59:58 INFO ShutdownHookManager: Deleting directory C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\userFiles-bd0bc2fb-ae10-406e-97b0-2dc58c0d1d10
24/09/02 11:59:58 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\userFiles-bd0bc2fb-ae10-406e-97b0-2dc58c0d1d10
java.io.IOException: Failed to delete: C:\Users\┬шЄрышщ\AppData\Local\Temp\spark-f0cffeae-c27a-43ad-a25c-31b49fe6005a\userFiles-bd0bc2fb-ae10-406e-97b0-2dc58c0d1d10\nebula-spark-connector-3.6.0.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:147)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90)
        at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
        at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

Does anyone know how to fix this issue?

Read/write operations from Nebula works in pySpark

Upvotes: 0

Views: 99

Answers (1)

changyuan wang
changyuan wang

Reputation: 1

java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport

The spark version is incompatible.

you can use spark 3.0 - 3.3 to run the nebula-spark-connector.

Upvotes: 0

Related Questions