Taimoor Abbasi
Taimoor Abbasi

Reputation: 91

PySpark - NoClassDefFoundError: kafka/common/TopicAndPartition

I'm running Spark version 2.3.0.2.6.5.1175-1 with Python 3. 6.8 on Ambari. While submitting the application I get the following logs in stderr

22/06/15 12:29:31 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint Exception in thread "Thread-10" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetPublicMethods(Class.java:2902) at java.lang.Class.getMethods(Class.java:1615) at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more 22/06/15 12:29:33 ERROR ApplicationMaster: User application exited with status 1

Following are the stdout logs

    ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "read_stream.py", line 13, in <module>
    stream.start(callback=callback)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 125, in start
    output = self.readStream().foreachRDD(lambda rdd: self.process(rdd, callback))
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 70, in readStream
    messageHandler=lambda x: (x.topic, x.message))
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/streaming/kafka.py", line 150, in createDirectStream
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o93.createDirectStreamWithMessageHandler

I've providing the following jar files

spark-sql-kafka-0-10_2.11-2.3.0.jar

spark-streaming-kafka-0-8_2.11-2.3.0.jar

metrics-core-2.2.0.jar

Is this some kind of configuration issue or is there something wrong in the code?

Edit: I am using livy to submit the job to the cluster. Below is the POST request code

    headers = {
    'X-Requested-By': 'admin',
    'Content-Type': 'application/json',
}

data = {
        "numExecutors": stream['executors'],
        "executorCores": stream['cores'],
        "executorMemory": stream['memory'],
        "driverMemory": "2g",
        "file": stream['path'],
        "queue": "default",
        "pyFiles": [
            "hdfs:///user/bds/elastic_search/es_pyspark_handler.py",
            "hdfs:///user/bds/realtime/OwlsenseStream.py"
        ],
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.yammer.metrics:metrics-core:2.2.0"
        },
        "name": stream['name']
    }

    data = json.dumps(data)

    response = requests.post(url=f"http://{IP_ADDRESS}:8998/batches", headers=headers, data=data,
                             verify=False)

Upvotes: 0

Views: 717

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191733

You need to also add kafka-clients dependency.

And don't add JARs manually, use packages option to download from Maven. You also don't need both Spark Streaming and Spark Structured Streaming, pick only one...

from pyspark.sql import SparkSession

# TODO: Ensure these match cluster environment
scala_version = '2.11'  
spark_version = '2.3.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0'  # TODO: match Kafka version
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate() 

Also, HDP 2.6.5 is very old, and out of support. I suggest you upgrade to something else; for example, use Apache Zeppelin or Jupyter to run Spark notebooks.

Upvotes: 1

Taimoor Abbasi
Taimoor Abbasi

Reputation: 91

I was unable to run the code on these versions probably there was something wrong with the jars versions. I changed the versions to the following:

  • Spark version 2.3.2

  • Kafka 2.8

  • Pyspark 2.3.2

  • Python 3.7

  • Java 1.8.*

I used the following packages:

--packages com.yammer.metrics:metrics-core:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 

Upvotes: 2

Related Questions