Reputation: 3260
How can I integrate Kafka producer with spark stateful streaming which uses checkpoint along with StreamingContext.getOrCreate
.
I read this post: How to write spark streaming DF to Kafka topic and implemented the method mentioned in this post: Spark and Kafka integration patterns
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSink {
def apply(): KafkaSink = {
val f = () => {
val kafkaProducerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "127.0.0.1:9092")
props.setProperty("batch.size", "8192");
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
props.setProperty("request.timeout.ms", "60000")
props
}
val producer = new KafkaProducer[String, String](kafkaProducerProps)
producer
}
new KafkaSink(f)
}
}
and
package webmetric
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import java.util.Properties
import scala.concurrent.Future;
object RecoverableJsonProcess {
def createContext(checkpointDirectory: String)
: StreamingContext = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
println("Creating new context")
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[2]")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(4))
ssc.checkpoint(checkpointDirectory)
...
...
val globalKafkaSink = ssc.sparkContext.broadcast(KafkaSink())
val mappingFunc = ...
val stateDstream = xitemPairs.mapWithState(
StateSpec.function(mappingFunc))
stateDstream.foreachRDD { rdd =>
rdd.foreach { message =>
globalKafkaSink.value.send("mytopic",message.toString())
}
}
stateDstream.print()
ssc
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
System.setProperty("hadoop.home.dir", "H:\\work\\spark\\")
val checkpointDirectory = "H:/work/spark/chk2"
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(checkpointDirectory))
ssc.start()
ssc.awaitTermination()
}
}
I did it and it works in first run which create a new context. But when try to get context from checkpoint directory after running it rises the following error when using Kafka producer:
22/04/20 09:34:44 ERROR Executor: Exception in task 0.0 in stage 99.0 (TID 35)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
22/04/20 09:34:44 WARN TaskSetManager: Lost task 0.0 in stage 99.0 (TID 35) (hajibaba.PC executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
22/04/20 09:34:44 ERROR TaskSetManager: Task 0 in stage 99.0 failed 1 times; aborting job
22/04/20 09:34:44 ERROR JobScheduler: Error running job streaming job 1650431044000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 99.0 failed 1 times, most recent failure: Lost task 0.0 in stage 99.0 (TID 35) (hajibaba.PC executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2251)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2200)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2199)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2199)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2438)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2380)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2369)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$7(RecoverableJsonProcess.scala:90)
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$7$adapted(RecoverableJsonProcess.scala:89)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.util.Try$.apply(Try.scala:209)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
... 3 more
Upvotes: 0
Views: 194
Reputation: 3173
I dug around this a little bit and found out that streaming doesn't support broadcast. Take a look at this issue on official spark issues page. If I understood correctly, broadcast variables will cause exceptions when recovering from checkpoint, since the actual data is lost in executor side, and recovery from driver side is not possible. Now there are some workarounds for this, I found this one which looks promising, in the mentioned example, it uses an accumulator, but there should be similar implementation for broadcast.
Upvotes: 1