duckertito
duckertito

Reputation: 3625

The usage of serializable object: Caused by: java.io.NotSerializableException

I follow this tutorial and other similar tutorials on Task serialization, but my code fails with the Task serialization error. I don't understand why does it happen. I am setting the variable topicOutputMessages outside of foreachRDD and then I am reading it within foreachPartition. Also I create KafkaProducer INSIDE foreachPartition. So, what is the problem here? Cannot really get the point.

al topicsSet = topicInputMessages.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)


messages.foreachRDD(rdd => {
    rdd.foreachPartition{iter =>
        UtilsDM.setMetadataBrokerList(metadataBrokerList)
        UtilsDM.setOutputTopic(topicOutputMessages)
        val producer = UtilsDM.createProducer
        iter.foreach { msg =>
              val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg)
              producer.send(record)
        }
        producer.close()
    }
})

EDIT:

object UtilsDM extends Serializable {

  var topicOutputMessages: String = ""
  var metadataBrokerList: String = ""
  var producer: KafkaProducer[String, String] = null

  def setOutputTopic(t: String): Unit = {
    topicOutputMessages = t
  }

  def setMetadataBrokerList(m: String): Unit = {
    metadataBrokerList = m
  }

 def createProducer: KafkaProducer[String, String] = {

    val  kafkaProps = new Properties()

    kafkaProps.put("bootstrap.servers", metadataBrokerList)

    // This is mandatory, even though we don't send key
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("acks", "1")

    // how many times to retry when produce request fails?
    kafkaProps.put("retries", "3")
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
    kafkaProps.put("batch.size", "5")
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
    kafkaProps.put("linger.ms", "5")

    new KafkaProducer[String, String](kafkaProps)
  }

}

Full stacktrace:

16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more

Upvotes: 6

Views: 2408

Answers (3)

maasg
maasg

Reputation: 37435

The serialization issue lies in how Spark deals with closure serialization (which you can read in detail in this answer: How spark handles object )

In the failing code, referencing metadataBrokerList and topicOutputMessages here:

rdd.foreachPartition{iter =>
    UtilsDM.setMetadataBrokerList(metadataBrokerList)
    UtilsDM.setOutputTopic(topicOutputMessages)

creates a reference to the outer object where these variables are created, and forces the closure cleaner in Spark to included in the "cleaned" closure. outer then includes sparkContext and streamingContext in the closure, which are not serializable and hence the serialization exception.

In the second attempt (in the workaround posted as an answer), these links are broken as the variables are now contained in the help object and the closure can be "cut clean" from the outer context.

I'd think that adding @transient to these variables is not necessary within the UtilsDM object, given that the values are serializable. Be aware that singleton objects are recreated in each executor. Therefore the value of mutable variables changed in the driver will not be available in the executors, often leading to NullPointerException if not handled properly.

There's a serialization trick that would help in the original scenario:

Copy referenced variables within the closure. e.g.

rdd.foreachPartition{iter =>
    val innerMDBL  = metadataBrokerList
    val innerTOM = topicOutputMessages
    UtilsDM.setMetadataBrokerList(innerMDBL)
    UtilsDM.setOutputTopic(innerTOM)

That way, the values are copied at compile time and there's also no link with outer.

To deal with executor-bound objects (like non-serializable connections or even local caches) I prefer to use a instance factory approach, like explained in this answer: Redis on Spark:Task not serializable

Upvotes: 2

duckertito
duckertito

Reputation: 3625

This is not the answer to my question, but it is the option that works.Maybe someone could elaborate it in the final answer? The disadvantage of this solution is that metadataBrokerList and topicOutputMessages should be fixed from the code of UtilsTest using @transient lazy val topicOutputMessages and @transient lazy val metadataBrokerList, but ideally I would like to be able to pass these parameters as input parameters:

object TestRunner {

  var zkQuorum: String = ""
  var metadataBrokerList: String = ""
  var group: String = ""
  val topicInputMessages: String = ""

  def main(args: Array[String]) {

    if (args.length < 14) {
      System.err.println("Usage: TestRunner <zkQuorum> <metadataBrokerList> " +
                          "<group> <topicInputMessages>")
      System.exit(1)
    }

    val Array(zkQuorum,metadataBrokerList,group,topicInputMessages) = args

    setParameters(zkQuorum,metadataBrokerList,group,topicInputMessages)

    run(kafka_num_threads.toInt)

  }


  def setParameters(mi: String,
                    mo: String,
                    g: String,t: String) {
    zkQuorum = mi
    metadataBrokerList = mo
    group = g
    topicInputMessages = t
  }


def run(kafkaNumThreads: Int) = {
    val conf = new SparkConf()
      .setAppName("TEST")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("~/checkpointDir")

val ssc = new StreamingContext(sc, Seconds(5)) 

val topicMessagesMap = topicInputMessages.split(",").map((_, 1)).toMap
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)

    messages.foreachRDD(rdd => {
      rdd.foreachPartition{iter =>
        val producer = UtilsTest.createProducer
        iter.foreach { msg =>
          val record = new ProducerRecord[String, String](UtilsTest.getOutputTopic(), msg)
          producer.send(record)
        }
        producer.close()
      }

    })

    ssc.start()
    ssc.awaitTermination()

  }

}



object UtilsDM extends Serializable {

  @transient lazy val topicOutputMessages: String = "myTestTopic"
  @transient lazy val metadataBrokerList: String = "172.12.34.233:9092"

  var producer: KafkaProducer[String, String] = null

 def createProducer: KafkaProducer[String, String] = {

    val  kafkaProps = new Properties()

    kafkaProps.put("bootstrap.servers", metadataBrokerList)

    // This is mandatory, even though we don't send key
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("acks", "1")

    // how many times to retry when produce request fails?
    kafkaProps.put("retries", "3")
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
    kafkaProps.put("batch.size", "5")
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
    kafkaProps.put("linger.ms", "5")

    new KafkaProducer[String, String](kafkaProps)
  }

}

Upvotes: 0

TheMP
TheMP

Reputation: 8427

I think the problem lies in your UtilsDM class. It is being captured by closure and Spark attempts to serialize the code to ship it to executors.

Try to make UtilsDM serializable or create it within the foreachRDD function.

Upvotes: 1

Related Questions