Reputation: 3625
I follow this tutorial and other similar tutorials on Task serialization, but my code fails with the Task serialization
error. I don't understand why does it happen. I am setting the variable topicOutputMessages
outside of foreachRDD
and then I am reading it within foreachPartition
. Also I create KafkaProducer
INSIDE foreachPartition
. So, what is the problem here? Cannot really get the point.
al topicsSet = topicInputMessages.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
messages.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
UtilsDM.setMetadataBrokerList(metadataBrokerList)
UtilsDM.setOutputTopic(topicOutputMessages)
val producer = UtilsDM.createProducer
iter.foreach { msg =>
val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg)
producer.send(record)
}
producer.close()
}
})
EDIT:
object UtilsDM extends Serializable {
var topicOutputMessages: String = ""
var metadataBrokerList: String = ""
var producer: KafkaProducer[String, String] = null
def setOutputTopic(t: String): Unit = {
topicOutputMessages = t
}
def setMetadataBrokerList(m: String): Unit = {
metadataBrokerList = m
}
def createProducer: KafkaProducer[String, String] = {
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", metadataBrokerList)
// This is mandatory, even though we don't send key
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("acks", "1")
// how many times to retry when produce request fails?
kafkaProps.put("retries", "3")
// This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
kafkaProps.put("batch.size", "5")
// How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
kafkaProps.put("linger.ms", "5")
new KafkaProducer[String, String](kafkaProps)
}
}
Full stacktrace:
16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
- object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
- field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
- object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
- field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
- object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more
16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
- object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
- field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
- object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
- field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
- object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more
Upvotes: 6
Views: 2408
Reputation: 37435
The serialization issue lies in how Spark deals with closure serialization (which you can read in detail in this answer: How spark handles object )
In the failing code, referencing metadataBrokerList
and topicOutputMessages
here:
rdd.foreachPartition{iter =>
UtilsDM.setMetadataBrokerList(metadataBrokerList)
UtilsDM.setOutputTopic(topicOutputMessages)
creates a reference to the outer object where these variables are created, and forces the closure cleaner in Spark to included in the "cleaned" closure. outer
then includes sparkContext
and streamingContext
in the closure, which are not serializable and hence the serialization exception.
In the second attempt (in the workaround posted as an answer), these links are broken as the variables are now contained in the help object and the closure can be "cut clean" from the outer
context.
I'd think that adding @transient
to these variables is not necessary within the UtilsDM
object, given that the values are serializable. Be aware that singleton objects are recreated in each executor. Therefore the value of mutable variables changed in the driver will not be available in the executors, often leading to NullPointerException if not handled properly.
There's a serialization trick that would help in the original scenario:
Copy referenced variables within the closure. e.g.
rdd.foreachPartition{iter =>
val innerMDBL = metadataBrokerList
val innerTOM = topicOutputMessages
UtilsDM.setMetadataBrokerList(innerMDBL)
UtilsDM.setOutputTopic(innerTOM)
That way, the values are copied at compile time and there's also no link with outer.
To deal with executor-bound objects (like non-serializable connections or even local caches) I prefer to use a instance factory approach, like explained in this answer: Redis on Spark:Task not serializable
Upvotes: 2
Reputation: 3625
This is not the answer to my question, but it is the option that works.Maybe someone could elaborate it in the final answer? The disadvantage of this solution is that metadataBrokerList
and topicOutputMessages
should be fixed from the code of UtilsTest
using @transient lazy val topicOutputMessages
and @transient lazy val metadataBrokerList
, but ideally I would like to be able to pass these parameters as input parameters:
object TestRunner {
var zkQuorum: String = ""
var metadataBrokerList: String = ""
var group: String = ""
val topicInputMessages: String = ""
def main(args: Array[String]) {
if (args.length < 14) {
System.err.println("Usage: TestRunner <zkQuorum> <metadataBrokerList> " +
"<group> <topicInputMessages>")
System.exit(1)
}
val Array(zkQuorum,metadataBrokerList,group,topicInputMessages) = args
setParameters(zkQuorum,metadataBrokerList,group,topicInputMessages)
run(kafka_num_threads.toInt)
}
def setParameters(mi: String,
mo: String,
g: String,t: String) {
zkQuorum = mi
metadataBrokerList = mo
group = g
topicInputMessages = t
}
def run(kafkaNumThreads: Int) = {
val conf = new SparkConf()
.setAppName("TEST")
val sc = new SparkContext(conf)
sc.setCheckpointDir("~/checkpointDir")
val ssc = new StreamingContext(sc, Seconds(5))
val topicMessagesMap = topicInputMessages.split(",").map((_, 1)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)
messages.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val producer = UtilsTest.createProducer
iter.foreach { msg =>
val record = new ProducerRecord[String, String](UtilsTest.getOutputTopic(), msg)
producer.send(record)
}
producer.close()
}
})
ssc.start()
ssc.awaitTermination()
}
}
object UtilsDM extends Serializable {
@transient lazy val topicOutputMessages: String = "myTestTopic"
@transient lazy val metadataBrokerList: String = "172.12.34.233:9092"
var producer: KafkaProducer[String, String] = null
def createProducer: KafkaProducer[String, String] = {
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", metadataBrokerList)
// This is mandatory, even though we don't send key
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("acks", "1")
// how many times to retry when produce request fails?
kafkaProps.put("retries", "3")
// This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
kafkaProps.put("batch.size", "5")
// How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
kafkaProps.put("linger.ms", "5")
new KafkaProducer[String, String](kafkaProps)
}
}
Upvotes: 0
Reputation: 8427
I think the problem lies in your UtilsDM
class. It is being captured by closure and Spark attempts to serialize the code to ship it to executors.
Try to make UtilsDM serializable or create it within the foreachRDD function.
Upvotes: 1