Andre Carneiro
Andre Carneiro

Reputation: 736

pyspark - kafka integration: missing lib

I'm following instructions from Databricks in this address in order to start a project with Kafka:

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher

The code:

# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json

spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')


trainingSchema = StructType([
  StructField("code",StringType(),True),
  StructField("ean",StringType(),True),
  StructField("name",StringType(),True),
  StructField("description",StringType(),True),
  StructField("category",StringType(),True),
  StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)

broker, topic = 
['kafka.partner.stg.some.domain:9092','hybris.products']

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 
"kafka.partner.stg.some.domain:9092") \
.option("subscribe", "hybris.products") \
.option("startingOffsets", "earliest") \
.load()

My version of Hadoop is 2.6 and the version of Spark is 2.3.0

The command line with spark-submit is:

spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-002.py

The error message:

Py4JJavaError: An error occurred while calling o48.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:413) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala) at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:360) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:64) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:231) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

As you can check on the website I mentioned above the jar file I'm importing is exactly the same file. So, I have no idea why this is happening. Maybe another module not mentioned? I'm really lost here

Upvotes: 0

Views: 1273

Answers (1)

Mariusz
Mariusz

Reputation: 13926

The mentioned JAR doesn't include all dependencies with kafka clients. You should rather use --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 (as mentioned in the docs in section Deployment: https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html#deploying)

Upvotes: 2

Related Questions