Reputation: 91
I'm running Spark version 2.3.0.2.6.5.1175-1 with Python 3. 6.8 on Ambari. While submitting the application I get the following logs in stderr
22/06/15 12:29:31 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint Exception in thread "Thread-10" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetPublicMethods(Class.java:2902) at java.lang.Class.getMethods(Class.java:1615) at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more 22/06/15 12:29:33 ERROR ApplicationMaster: User application exited with status 1
Following are the stdout logs
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1062, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 908, in send_command
response = connection.send_command(command)
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1067, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
File "read_stream.py", line 13, in <module>
stream.start(callback=callback)
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 125, in start
output = self.readStream().foreachRDD(lambda rdd: self.process(rdd, callback))
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 70, in readStream
messageHandler=lambda x: (x.topic, x.message))
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/streaming/kafka.py", line 150, in createDirectStream
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o93.createDirectStreamWithMessageHandler
I've providing the following jar files
spark-sql-kafka-0-10_2.11-2.3.0.jar
spark-streaming-kafka-0-8_2.11-2.3.0.jar
metrics-core-2.2.0.jar
Is this some kind of configuration issue or is there something wrong in the code?
Edit: I am using livy to submit the job to the cluster. Below is the POST request code
headers = {
'X-Requested-By': 'admin',
'Content-Type': 'application/json',
}
data = {
"numExecutors": stream['executors'],
"executorCores": stream['cores'],
"executorMemory": stream['memory'],
"driverMemory": "2g",
"file": stream['path'],
"queue": "default",
"pyFiles": [
"hdfs:///user/bds/elastic_search/es_pyspark_handler.py",
"hdfs:///user/bds/realtime/OwlsenseStream.py"
],
"conf": {
"spark.jars.packages": "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.yammer.metrics:metrics-core:2.2.0"
},
"name": stream['name']
}
data = json.dumps(data)
response = requests.post(url=f"http://{IP_ADDRESS}:8998/batches", headers=headers, data=data,
verify=False)
Upvotes: 0
Views: 717
Reputation: 191733
You need to also add kafka-clients
dependency.
And don't add JARs manually, use packages
option to download from Maven. You also don't need both Spark Streaming and Spark Structured Streaming, pick only one...
from pyspark.sql import SparkSession
# TODO: Ensure these match cluster environment
scala_version = '2.11'
spark_version = '2.3.0'
packages = [
f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
'org.apache.kafka:kafka-clients:3.2.0' # TODO: match Kafka version
]
spark = SparkSession.builder\
.master("local")\
.appName("kafka-example")\
.config("spark.jars.packages", ",".join(packages))\
.getOrCreate()
Also, HDP 2.6.5 is very old, and out of support. I suggest you upgrade to something else; for example, use Apache Zeppelin or Jupyter to run Spark notebooks.
Upvotes: 1
Reputation: 91
I was unable to run the code on these versions probably there was something wrong with the jars versions. I changed the versions to the following:
Spark version 2.3.2
Kafka 2.8
Pyspark 2.3.2
Python 3.7
Java 1.8.*
I used the following packages:
--packages com.yammer.metrics:metrics-core:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
Upvotes: 2