Reputation: 2230
I am using Spark's multiple inputstream reader to read message from Kafka. I am getting below mentioned error. If I don't use multiple input stream reader , I am not getting any error. To achieve performance, I need to use parallel concept, testing purpose I using only one.
Error
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 120, CreateTime = -1, checksum = 2372777361, serialized key size = -1, serialized value size = 48, key = null, value = 10051,2018-03-15 17:12:24+0000,Bentonville,Gnana))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:239)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18/03/15 17:12:24 ERROR TaskSetManager: Task 0.0 in stage 470.0 (TID 470) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Code:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.Success
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object ParallelStreamJob {
def main(args: Array[String]): Unit = {
val spark = SparkHelper.getOrCreateSparkSession()
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val kafkaStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val numPartitionsOfInputTopic = 1
val streams = (1 to numPartitionsOfInputTopic) map { _ =>
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 1
unifiedStream.repartition(sparkProcessingParallelism)
}
kafkaStream.foreachRDD(rdd=> {
rdd.foreach(conRec=> {
println(conRec.value())
})
})
println(" Spark parallel reader is ready !!!")
ssc.start()
ssc.awaitTermination()
}
}
sbt
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion ,
"org.apache.spark" %% "spark-sql" % sparkVersion ,
"org.apache.spark" %% "spark-hive" % sparkVersion ,
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion ,
"org.apache.kafka" %% "kafka" % "0.10.1.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion ,
)
How to resolve this issue ?
Upvotes: 1
Views: 1345
Reputation: 61
First of all, if you have 1 topic then you shouldn't use creating multiple Kafkastreams as you are using the direct approach which will automatically create as many numbers of threads as there are a number of Kafka partitions for a topic.Spark will automatically take care of parallelizing your tasks if you follow DirectApproach. Try to use repartition() at RDD level instead repartitioning the Dstream itself.
Upvotes: 0
Reputation: 1110
The issue is clear java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord
. The ConsumerRecord class doesn't extend Serializable
Try to take out value
field of ConsumerRecord
before foreachRdd
operation kafkaStream.map(_.value())
.
Update 1: The above fix doesn't work because exception happen at ssc.union(streams)
.ssc.union(streams)
requires data transfer between nodes, it must serialize data. So, you can take out value
field by map
before union
operation to fix the issue.
KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())
Upvotes: 2