Jim Macaulay
Jim Macaulay

Reputation: 5141

PySpark Kafka py4j.protocol.Py4JJavaError: An error occurred while calling o28.load

While converting Kafka messages to dataframe am getting error while passing the packages as an argument.

from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-sql-kafka-0-10_2.11-2.0.2.jar,spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar pyspark-shell'

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "Jim_Topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. : java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated

Upvotes: 1

Views: 2403

Answers (2)

Jim Macaulay
Jim Macaulay

Reputation: 5141

Defining the jars with below configuration helped me,

spark = SparkSession.builder\
  .appName("Kafka Spark")\
  .config("spark.jars", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin- hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.executor.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.executor.extraLibrary", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.driver.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .getOrCreate()

Upvotes: 0

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39790

This is happening because the version of spark-sql-kafka does not match the spark version you are currently running.


For example, the dependency you are currently using would work for Spark 2.4.1:

org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 

To fix the issue, simply use the version of your Spark at the end of the dependency string (replace x.y.z):

org.apache.spark:spark-sql-kafka-0-10_2.11:x.y.z 

Upvotes: 1

Related Questions