Gnana
Gnana

Reputation: 2230

Spark Parallel Stream - object not serializable

I am using Spark's multiple inputstream reader to read message from Kafka. I am getting below mentioned error. If I don't use multiple input stream reader , I am not getting any error. To achieve performance, I need to use parallel concept, testing purpose I using only one.

Error

java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 120, CreateTime = -1, checksum = 2372777361, serialized key size = -1, serialized value size = 48, key = null, value = 10051,2018-03-15 17:12:24+0000,Bentonville,Gnana))
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:239)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/15 17:12:24 ERROR TaskSetManager: Task 0.0 in stage 470.0 (TID 470) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord

Code:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.Success
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object ParallelStreamJob {

  def main(args: Array[String]): Unit = {
    val spark = SparkHelper.getOrCreateSparkSession()
    val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
    val kafkaStream = {

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "use_a_separate_group_id_for_each_stream",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("test")
      val numPartitionsOfInputTopic = 1
      val streams = (1 to numPartitionsOfInputTopic) map { _ =>
        KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
      }
      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1 
      unifiedStream.repartition(sparkProcessingParallelism)
    }

    kafkaStream.foreachRDD(rdd=> {
      rdd.foreach(conRec=> {
        println(conRec.value())
      })
    })

    println(" Spark parallel reader is ready !!!")

   ssc.start()
    ssc.awaitTermination()

  }
}

sbt

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion ,
  "org.apache.spark" %% "spark-sql" % sparkVersion  ,
  "org.apache.spark" %% "spark-hive" % sparkVersion  ,
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.kafka" %% "kafka" % "0.10.1.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" %  sparkVersion  ,
)

How to resolve this issue ?

Upvotes: 1

Views: 1345

Answers (2)

USY
USY

Reputation: 61

First of all, if you have 1 topic then you shouldn't use creating multiple Kafkastreams as you are using the direct approach which will automatically create as many numbers of threads as there are a number of Kafka partitions for a topic.Spark will automatically take care of parallelizing your tasks if you follow DirectApproach. Try to use repartition() at RDD level instead repartitioning the Dstream itself.

Upvotes: 0

Thang Nguyen
Thang Nguyen

Reputation: 1110

The issue is clear java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord. The ConsumerRecord class doesn't extend Serializable

Try to take out value field of ConsumerRecord before foreachRdd operation kafkaStream.map(_.value()).

Update 1: The above fix doesn't work because exception happen at ssc.union(streams).ssc.union(streams) requires data transfer between nodes, it must serialize data. So, you can take out value field by map before union operation to fix the issue.

KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())

Upvotes: 2

Related Questions