Reputation: 23
I'm trying to run a zeppelin notebook that contains spark's Structured Streaming example with Kafka connector.
>kafka is up and running on localhost port 9092
>from zeppelin notebook, sc.version returns String = 2.0.2
Here is my environment:
kafka: kafka_2.10-0.10.1.0
zeppelin: zeppelin-0.6.2-bin-all
spark: spark-2.0.2-bin-hadoop2.7
Here is the code in my zeppelin notebook:
import org.apache.enter code herespark.sql.functions.{explode, split}
// Setup connection to Kafka val kafka = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
// comma separated list of broker:host
.option("subscribe", "twitter")
// comma separated list of topics
.option("startingOffsets", "latest")
// read data from the end of the stream .load()
Here is the error I'm getting when I run the notebook:
import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 elided Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at scala.util.Try$.apply(Try.scala:192)
Any help advice would be greatly appreciated.
Thnx
Upvotes: 2
Views: 2017
Reputation: 591
This solution has been tested in zeppelin version 0.10.1.
You need to add dependencies of your code. It can be done with zeppelin UI. Go to Interpreter panel (http://localhost:8080/#/interpreter) and in spark section, under Dependencies you can add artifact of each dependency. If by adding spark-sql-kafka you ran into other dependency issues, add all packages the spark-sql-kafka needs. You can find them in Compile Dependencies section of it's maven repository.
I'm working with spark version 3.0.0 and scala version 2.12 and I was trying to integrate spark with kafka. I managed to get passed this issue by adding all the bellow artifacts:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
com.google.code.findbugs:jsr305:3.0.0
org.apache.spark:spark-tags_2.12:3.3.0
org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.3.0
org.apache.kafka:kafka-clients:3.0.0
Upvotes: 0
Reputation: 41
You probably have figured this out already but putting in the answer for others, you have to add the following to zeppelin-env.sh.j2
SPARK_SUBMIT_OPTIONS=--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0
along with potentially other dependencies if you are using the kafka client:
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql_2.11:2.1.0,org.apache.kafka:kafka_2.11:0.10.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0,org.apache.kafka:kafka-clients:0.10.0.1
Upvotes: 1