Reputation: 31
I use Spark Streaming with Apache Kafka.
val directKafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder ](
ssc, kafkaParams, topics)
val events = directKafkaStream.flatMap(x=>{
val data = JSONObject.fromObject(x._2)
Some(data)
})
val dbIndex = 1
val clickHashKey = "app::users::click"
val userClicks = events.map(x=>(x.getString("userid"),x.getInt("click_count"))).reduceByKey(_+_)
userClicks.foreachRDD(partitionOfRecords=>partitionOfRecords.foreach(pair=>{
val userid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, userid, clickCount)
RedisClient.pool.returnResource(jedis)
}))
ssc.start()
ssc.awaitTermination()
It fails with the follow exception:
16/12/11 14:17:20 INFO DAGScheduler: ShuffleMapStage 146 (map at UserClickCountAnalysis.scala:75) failed in 0.068 s
16/12/11 14:17:20 INFO DAGScheduler: Job 73 failed: foreachRDD at UserClickCountAnalysis.scala:76, took 0.073045 s
16/12/11 14:17:20 ERROR JobScheduler: Error running job streaming job 1481437040000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 146.0 failed 4 times, most recent failure: Lost task 0.3 in stage 146.0 (TID 295, 10.211.55.12): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/12/11 14:17:25 INFO JobScheduler: Added jobs for time 1481437045000 ms
16/12/11 14:17:25 INFO JobScheduler: Starting job streaming job 1481437045000 ms.0 from job set of time 1481437045000 ms
16/12/11 14:17:25 INFO SparkContext: Starting job: foreachRDD at UserClickCountAnalysis.scala:76
16/12/11 14:17:25 INFO DAGScheduler: Registering RDD 298 (map at UserClickCountAnalysis.scala:75)
16/12/11 14:17:25 INFO DAGScheduler: Got job 74 (foreachRDD at UserClickCountAnalysis.scala:76) with 2 output partitions (allowLocal=false)
16/12/11 14:17:25 INFO DAGScheduler: Final stage: ResultStage 149(foreachRDD at UserClickCountAnalysis.scala:76)
16/12/11 14:17:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 148)
16/12/11 14:17:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 148)
16/12/11 14:17:25 INFO DAGScheduler: Submitting ShuffleMapStage 148 (MapPartitionsRDD[298] at map at UserClickCountAnalysis.scala:75), which has no missing parents
16/12/11 14:17:25 INFO MemoryStore: ensureFreeSpace(3880) called with curMem=42510, maxMem=2061647216
16/12/11 14:17:25 INFO MemoryStore: Block broadcast_74 stored as values in memory (estimated size 3.8 KB, free 1966.1 MB)
16/12/11 14:17:25 INFO MemoryStore: ensureFreeSpace(2194) called with curMem=46390, maxMem=2061647216
16/12/11 14:17:25 INFO MemoryStore: Block broadcast_74_piece0 stored as bytes in memory (estimated size 2.1 KB, free 1966.1 MB)
16/12/11 14:17:25 INFO BlockManagerInfo: Added broadcast_74_piece0 in memory on 192.168.1.103:56006 (size: 2.1 KB, free: 1966.1 MB)
16/12/11 14:17:25 INFO SparkContext: Created broadcast 74 from broadcast at DAGScheduler.scala:874
16/12/11 14:17:25 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 148 (MapPartitionsRDD[298] at map at UserClickCountAnalysis.scala:75)
16/12/11 14:17:25 INFO TaskSchedulerImpl: Adding task set 148.0 with 1 tasks
16/12/11 14:17:25 INFO TaskSetManager: Starting task 0.0 in stage 148.0 (TID 296, 10.211.55.12, ANY, 1271 bytes)
16/12/11 14:17:25 WARN TaskSetManager: Lost task 0.0 in stage 148.0 (TID 296, 10.211.55.12): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
my pom.xml below
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>phnasis</groupId>
<artifactId>sparkstreamingUserClick</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-core_2.10</artifactId>-->
<!--<version>1.4.0</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!--
Bind the maven-assembly-plugin to the package phase
this will create a jar file without the storm dependencies
suitable for deployment to a cluster.
-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Upvotes: 3
Views: 3053
Reputation: 74779
Given your pom.xml
with the following:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
I guess that the issue is in the way you submit your Spark Streaming application for execution.
You have to include the dependency on Spark environment's classpath using one of the two possible ways (that strongly depends on the version of Spark you use):
spark-submit
with --packages
that is a comma-separated list of maven coordinates of jars to include on the driver and executor classpaths, e.g.
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.0.2
(not recommended) Assemble the Spark dependency in your jar that in the end becomes an uberjar with this and the other dependencies (unless you exclude them by provided
).
The recommended approach is to use the option 1, but that requires the recent Spark version (with --packages
support) and due to the Spark version change also different spark-streaming-kafka
module that was split to 0.8
and 0.10
.
Upvotes: 1
Reputation:
Spark streaming requires a separate artifact for Kafka. In your case
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
PROTIP: You should really consider updating Spark version to the latest 2.0.2 whenever you get a chance.
Upvotes: 0