Majid Hajibaba
Majid Hajibaba

Reputation: 3260

spark stateful streaming with checkpoint + kafka producer

How can I integrate Kafka producer with spark stateful streaming which uses checkpoint along with StreamingContext.getOrCreate.

I read this post: How to write spark streaming DF to Kafka topic and implemented the method mentioned in this post: Spark and Kafka integration patterns

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}

object KafkaSink {
  def apply(): KafkaSink = {
    val f = () => {

      val kafkaProducerProps: Properties = {
        val props = new Properties()
        props.put("bootstrap.servers", "127.0.0.1:9092")
        props.setProperty("batch.size", "8192");
        props.put("key.serializer", classOf[StringSerializer].getName)
        props.put("value.serializer", classOf[StringSerializer].getName)
        props.setProperty("request.timeout.ms", "60000")

        props
      }

      val producer = new KafkaProducer[String, String](kafkaProducerProps)

      producer
    }
    new KafkaSink(f)
  }
}

and

package webmetric

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

import java.util.Properties
import scala.concurrent.Future;

object RecoverableJsonProcess {

  def createContext(checkpointDirectory: String)
  : StreamingContext = {

    // If you do not see this printed, that means the StreamingContext has been loaded
    // from the new checkpoint
    println("Creating new context")

    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[2]")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(4))
    ssc.checkpoint(checkpointDirectory)

   ...
   ...

    val globalKafkaSink = ssc.sparkContext.broadcast(KafkaSink())

    val mappingFunc = ...

    val stateDstream = xitemPairs.mapWithState(
      StateSpec.function(mappingFunc))

    stateDstream.foreachRDD { rdd =>
      rdd.foreach { message =>

        globalKafkaSink.value.send("mytopic",message.toString())
      }
    }

    stateDstream.print()

    ssc
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    System.setProperty("hadoop.home.dir", "H:\\work\\spark\\")

    val checkpointDirectory = "H:/work/spark/chk2"
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(checkpointDirectory))

    ssc.start()
    ssc.awaitTermination()
  }

}

I did it and it works in first run which create a new context. But when try to get context from checkpoint directory after running it rises the following error when using Kafka producer:

22/04/20 09:34:44 ERROR Executor: Exception in task 0.0 in stage 99.0 (TID 35)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
22/04/20 09:34:44 WARN TaskSetManager: Lost task 0.0 in stage 99.0 (TID 35) (hajibaba.PC executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

22/04/20 09:34:44 ERROR TaskSetManager: Task 0 in stage 99.0 failed 1 times; aborting job
22/04/20 09:34:44 ERROR JobScheduler: Error running job streaming job 1650431044000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 99.0 failed 1 times, most recent failure: Lost task 0.0 in stage 99.0 (TID 35) (hajibaba.PC executor driver): java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2251)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2200)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2199)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2199)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2438)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2380)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2369)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$7(RecoverableJsonProcess.scala:90)
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$7$adapted(RecoverableJsonProcess.scala:89)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.Try$.apply(Try.scala:209)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to webmetric.MySparkKafkaProducer
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8(RecoverableJsonProcess.scala:92)
    at webmetric.RecoverableJsonProcess$.$anonfun$createContext$8$adapted(RecoverableJsonProcess.scala:90)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    ... 3 more

Upvotes: 0

Views: 194

Answers (1)

AminMal
AminMal

Reputation: 3173

I dug around this a little bit and found out that streaming doesn't support broadcast. Take a look at this issue on official spark issues page. If I understood correctly, broadcast variables will cause exceptions when recovering from checkpoint, since the actual data is lost in executor side, and recovery from driver side is not possible. Now there are some workarounds for this, I found this one which looks promising, in the mentioned example, it uses an accumulator, but there should be similar implementation for broadcast.

Upvotes: 1

Related Questions