Reputation: 11
I'm trying to read stream from Kafka using pyspark.
The Stack I'm working with:
And here is the error I'm getting:
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 11) (192.168.218.135 executor 0): java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition
at org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:124)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Here is my code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test2").remote("sc://spark-master-svc.data-lab-system:15002").config("spark.executor.memory", "512m").getOrCreate()
kafka_service = "kafka-kafka-bootstrap.data-lab-system:9092"
topic_name = "pre-proccessing_posts_stream"
group_id = "spark-consumer-group3"
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_service) \
.option("subscribe", topic_name) \
.option("kafka.group.id", group_id) \
.load()
# .option("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
df.printSchema() \\ working and I got output from it
df.show() \\ here I got the error
This error only occurs when spark-connect connected to spark-cluster. If it work as single node everything goes well.
Upvotes: 1
Views: 74