ram reddy
ram reddy

Reputation: 25

Kafka with spark structuredstreaming

We are doing Kafka with Spark structured streaming while executing we are getting the issue below:

    Ivy Default Cache set to: /root/.ivy2/cache
 The jars for the packages stored in: /root/.ivy2/jars
 :: loading settings :: url = jar:file:/usr/hdp/2.6.3.0-235/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
 org.apache.kafka#kafka-clients added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.11;2.2.0 in central
    found org.apache.spark#spark-tags_2.11;2.2.0 in local-m2-cache
    found org.spark-project.spark#unused;1.0.0 in local-m2-cache
    found org.apache.kafka#kafka-clients;0.10.1.0 in local-m2-cache
    found net.jpountz.lz4#lz4;1.3.0 in local-m2-cache
    found org.xerial.snappy#snappy-java;1.1.2.6 in local-m2-cache
    found org.slf4j#slf4j-api;1.7.21 in local-m2-cache
 :: resolution report :: resolve 3640ms :: artifacts dl 20ms
    :: modules in use:
    net.jpountz.lz4#lz4;1.3.0 from local-m2-cache in [default]
    org.apache.kafka#kafka-clients;0.10.1.0 from local-m2-cache in [default]
    org.apache.spark#spark-sql-kafka-0-10_2.11;2.2.0 from central in [default]
    org.apache.spark#spark-tags_2.11;2.2.0 from local-m2-cache in [default]
    org.slf4j#slf4j-api;1.7.21 from local-m2-cache in [default]
    org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from local-m2-cache in [default]
    :: evicted modules:
    org.apache.kafka#kafka-clients;0.10.0.1 by [org.apache.kafka#kafka-clients;0.10.1.0] in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   8   |   2   |   2   |   1   ||   7   |   0   |
    ---------------------------------------------------------------------
 :: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 7 already retrieved (0kB/30ms)
 18/03/14 15:52:02 INFO SparkContext: Running Spark version 2.2.0.2.6.3.0-235
 18/03/14 15:52:02 INFO SparkContext: Submitted application: StructuredKafkaWordCount
...
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/usr/hdp/2.6.3.0-235/spark2/examples/jars/spark-examples_2.11-2.2.0.2.6.3.0-235.jar at spark://172.16.10.53:31702/jars/spark-examples_2.11-2.2.0.2.6.3.0-235.jar with timestamp 1521022925004
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/usr/hdp/2.6.3.0-235/spark2/examples/jars/scopt_2.11-3.3.0.jar at spark://172.16.10.53:31702/jars/scopt_2.11-3.3.0.jar with timestamp 1521022925006
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/usr/hdp/2.6.3.0-235/spark2/examples/jars/spark-assembly_2.10-0.9.0-incubating.jar at spark://172.16.10.53:31702/jars/spark-assembly_2.10-0.9.0-incubating.jar with timestamp 1521022925006
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar at spark://172.16.10.53:31702/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar with timestamp 1521022925006
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.1.0.jar at spark://172.16.10.53:31702/jars/org.apache.kafka_kafka-clients-0.10.1.0.jar with timestamp 1521022925006
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar at spark://172.16.10.53:31702/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar with timestamp 1521022925006
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at spark://172.16.10.53:31702/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1521022925007
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar at spark://172.16.10.53:31702/jars/net.jpountz.lz4_lz4-1.3.0.jar with timestamp 1521022925007
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar at spark://172.16.10.53:31702/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar with timestamp 1521022925007
 18/03/14 15:52:05 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar at spark://172.16.10.53:31702/jars/org.slf4j_slf4j-api-1.7.21.jar with timestamp 1521022925007
 18/03/14 15:52:05 INFO Executor: Starting executor ID driver on host localhost
 18/03/14 15:52:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 11138.
 18/03/14 15:52:05 INFO NettyBlockTransferService: Server created on 172.16.10.53:11138
 18/03/14 15:52:05 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
 18/03/14 15:52:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.10.53, 11138, None)
 18/03/14 15:52:05 INFO BlockManagerMasterEndpoint: Registering block manager 172.16.10.53:11138 with 366.3 MB RAM, BlockManagerId(driver, 172.16.10.53, 11138, None)
 18/03/14 15:52:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.10.53, 11138, None)
 18/03/14 15:52:05 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.16.10.53, 11138, None)
 18/03/14 15:52:05 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@10bea4{/metrics/json,null,AVAILABLE,@Spark}
 18/03/14 15:52:07 INFO EventLoggingListener: Logging events to hdfs:///spark2-history/local-1521022925116
 18/03/14 15:52:07 INFO SharedState: loading hive config file: file:/etc/spark2/2.6.3.0-235/0/hive-site.xml
 18/03/14 15:52:07 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/usr/hdp/2.6.3.0-235/spark2/bin/spark-warehouse/').
 18/03/14 15:52:07 INFO SharedState: Warehouse path is 'file:/usr/hdp/2.6.3.0-235/spark2/bin/spark-warehouse/'.
 18/03/14 15:52:07 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@e700eba{/SQL,null,AVAILABLE,@Spark}
 18/03/14 15:52:07 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7186b202{/SQL/json,null,AVAILABLE,@Spark}
 18/03/14 15:52:07 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3d88e6b9{/SQL/execution,null,AVAILABLE,@Spark}
 18/03/14 15:52:07 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@208205ed{/SQL/execution/json,null,AVAILABLE,@Spark}
 18/03/14 15:52:07 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2173a742{/static/sql,null,AVAILABLE,@Spark}
 18/03/14 15:52:09 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
 18/03/14 15:52:13 INFO StreamExecution: Starting [id = 7880cf41-0bbc-4b40-a284-c41f9dcbfbc8, runId = 5636831e-aad8-43e4-8d99-9a4b27b69b9f]. Use /tmp/temporary-a86e0bc9-99fd-45dd-b38a-4c5fc10def22 to store the query checkpoint.
 18/03/14 15:52:13 ERROR StreamExecution: Query [id = 7880cf41-0bbc-4b40-a284-c41f9dcbfbc8, runId = 5636831e-aad8-43e4-8d99-9a4b27b69b9f] terminated with error
 java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:246)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
    at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
 Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 47 more
 Exception in thread "main" Exception in thread "stream execution thread for [id = 7880cf41-0bbc-4b40-a284-c41f9dcbfbc8, runId = 5636831e-aad8-43e4-8d99-9a4b27b69b9f]" org.apache.spark.sql.streaming.StreamingQueryException: org/apache/kafka/common/serialization/ByteArrayDeserializer
 === Streaming Query ===
 Identifier: [id = 7880cf41-0bbc-4b40-a284-c41f9dcbfbc8, runId = 5636831e-aad8-43e4-8d99-9a4b27b69b9f]
 Current Committed Offsets: {}
 Current Available Offsets: {}

 Current State: INITIALIZING
 Thread State: RUNNABLE
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:343)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
 Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:246)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
    at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
    ... 1 more
 Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 47 more
 java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:246)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
    at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
 Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 47 more
 18/03/14 15:52:13 INFO SparkContext: Invoking stop() from shutdown hook
 18/03/14 15:52:13 INFO AbstractConnector: Stopped Spark@30221a6b{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
 18/03/14 15:52:13 INFO SparkUI: Stopped Spark web UI at http://172.16.10.53:4041
 18/03/14 15:52:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
 18/03/14 15:52:14 INFO MemoryStore: MemoryStore cleared
 18/03/14 15:52:14 INFO BlockManager: BlockManager stopped
 18/03/14 15:52:14 INFO BlockManagerMaster: BlockManagerMaster stopped
 18/03/14 15:52:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
 18/03/14 15:52:14 INFO SparkContext: Successfully stopped SparkContext
 18/03/14 15:52:14 INFO ShutdownHookManager: Shutdown hook called
 18/03/14 15:52:14 INFO ShutdownHookManager: Deleting directory /tmp/spark-bfc1f921-8877-4dc8-81ed-dfcba6da84c0
 18/03/14 15:52:14 INFO ShutdownHookManager: Deleting directory /tmp/temporary-a86e0bc9-99fd-45dd-b38a-4c5fc10def22

Before this, we imported spark-sql-kafka-0-10_2.11-2.0.2.jar using spark shell. We even tried to run the example in hortonworks using the ./runexample command.

Versions:

Upvotes: 0

Views: 6854

Answers (4)

Norman Bai
Norman Bai

Reputation: 155

Spark provides spark-streaming-kafka-0.8 package with its release version, and when you try to add spark-sql-kafka-0.10 structured streaming package, the kafka client conflicts with the one in the streaming package.

And that's the reason if you follow the other answers under this question, and you find that doesn't work at all.

That's why you included the spark-sql-kafka-0.10 package in your pom, or with '--jars', or with 'driver.extraClasspath' , and you still get a ClassNotFound exception.

The only thing you have to do is :

  1. delete the jar file in your path: ${SPARK_HOME}/jars/spark-streaming-kafka-0-8_2.11.jar

  2. use --jars ${your_lib}/spark-sql-kafka-0-10_2.11.jar,${your_lib}/kafka-clients-${your_version}.jar with your spark-submit command.

  3. besides, you may have to delete the lib file spark-streaming-kafka-0-8_2.11.jar on HDFS , if you have spark.yarn.jars , and you have put your spark lib in that directory.

Upvotes: 2

Ajax1986
Ajax1986

Reputation: 385

@Souvik is correct but what ever he specified is not a kafka-clients jar (but that is required too). Following is a sample spark-submit statement.

spark-submit --jars spark-sql-kafka-0-10_2.11-2.4.0.jar,kafka-clients-0.10.2.2.jar your-file.py

Upvotes: 1

Deepansh Goyal
Deepansh Goyal

Reputation: 41

It needs two jars 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0' and 'org.apache.kafka:kafka-clients:0.10.1.0', import both of them.

Instead of specifying jars manually you can use,

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.kafka:kafka-clients:0.10.1.0

It will automatically download jars with it.

Upvotes: 3

Souvik
Souvik

Reputation: 377

It seems that you're missing the kafka-clients jar. Pass this spark-sql-kafka-0-10_2.11-2.0.2.jar during spark-submit

spark-submit --jars /yourpath/spark-sql-kafka-0-10_2.11-2.0.2.jar

Upvotes: 0

Related Questions