pharpan
pharpan

Reputation: 65

Why does streaming job fail with NullPointerException when restarted from checkpoint?

I have started spark streaming recently and implementing checkpoint. I'm storing the checkpoint in HDFS. when the streaming failed it's able to go back to the last checkpoint but getting NullPointerException and the streaming job is getting killed. I'm able to see the checkpoints in HDFS. Not sure why I'm getting the exception even though there is chckpoint in HDFS. Any inputs will be helpful.

17/04/10 16:30:47 INFO JobGenerator: Batches during down time (2 batches):1491841680000 ms, 1491841800000 ms
17/04/10 16:30:47 INFO JobGenerator: Batches pending processing (0 batches): 
17/04/10 16:30:47 INFO JobGenerator: Batches to reschedule (2 batches): 1491841680000 ms, 1491841800000 ms
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841680000 ms
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34
17/04/10 16:30:48 INFO DAGScheduler: Got job 0 (isEmpty at piadj.scala:34) with 1 output partitions
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 0 (isEmpty at piadj.scala:34)
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List()
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List()
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32), which has no missing parents
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 4.1 KB)
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.1 KB, free 6.1 KB)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB)
17/04/10 16:30:48 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1008
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at piadj.scala:32)
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, oser402370.wal-mart.com, partition 0,ANY, 2108 bytes)
17/04/10 16:30:48 INFO JobScheduler: Added jobs for time 1491841800000 ms
17/04/10 16:30:48 INFO RecurringTimer: Started timer for JobGenerator at time 1491841920000
17/04/10 16:30:48 INFO JobGenerator: Restarted JobGenerator at 1491841920000 ms
17/04/10 16:30:48 INFO JobScheduler: Starting job streaming job 1491841800000 ms.0 from job set of time 1491841800000 ms
17/04/10 16:30:48 INFO JobScheduler: Started JobScheduler
17/04/10 16:30:48 INFO StreamingContext: StreamingContext started
17/04/10 16:30:48 INFO SparkContext: Starting job: isEmpty at piadj.scala:34
17/04/10 16:30:48 INFO DAGScheduler: Got job 1 (isEmpty at piadj.scala:34) with 1 output partitions
17/04/10 16:30:48 INFO DAGScheduler: Final stage: ResultStage 1 (isEmpty at piadj.scala:34)
17/04/10 16:30:48 INFO DAGScheduler: Parents of final stage: List()
17/04/10 16:30:48 INFO DAGScheduler: Missing parents: List()
17/04/10 16:30:48 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32), which has no missing parents
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 10.2 KB)
17/04/10 16:30:48 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 12.3 KB)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.26.118.23:35738 (size: 2.1 KB, free: 5.8 GB)
17/04/10 16:30:48 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008
17/04/10 16:30:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at piadj.scala:32)  
17/04/10 16:30:48 INFO YarnClusterScheduler: Adding task set 1.0 with 1 tasks
17/04/10 16:30:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, XXXXXXX, partition 0,ANY, 2108 bytes)
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB) 
17/04/10 16:30:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on XXXXXXX (size: 2.1 KB, free: 4.3 GB)
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1142 ms on XXXXXXX (1/1)
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 0 (isEmpty at piadj.scala:34) finished in 1.151 s
17/04/10 16:30:49 INFO DAGScheduler: Job 0 finished: isEmpty at piadj.scala:34, took 1.466449 s
17/04/10 16:30:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 979 ms on XXXXXXX (1/1)
17/04/10 16:30:49 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/04/10 16:30:49 INFO DAGScheduler: ResultStage 1 (isEmpty at piadj.scala:34) finished in 0.983 s
17/04/10 16:30:49 INFO DAGScheduler: Job 1 finished: isEmpty at piadj.scala:34, took 1.006658 s
17/04/10 16:30:49 INFO JobScheduler: Finished job streaming job 1491841680000 ms.0 from job set of time 1491841680000 ms
17/04/10 16:30:49 INFO JobScheduler: Total delay: 169.575 s for time 1491841680000 ms (execution: 1.568 s)
17/04/10 16:30:49 ERROR JobScheduler: Error running job streaming job 1491841680000 ms.0
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:39)
at walmart.com.piadj$$anonfun$createContext$1.apply(piadj.scala:33)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Below is my code

def createContext(brokers:String,topics:String,checkpointDirectory:String):StreamingContext={    
val sparkConf = new SparkConf().setAppName("pi")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint(checkpointDirectory)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
lines.foreachRDD { rdd =>
  if(!rdd.isEmpty()) {
    import sqlContext.implicits._
    val rdd2 = rdd.map(x => new JsonDeserializer().deserialize("pi_adj",x))        
    val rdd3 = rdd2.map(x => new String(x,"UTF-8"))
    val df1 = sqlContext.read.json(rdd3)
  /*some other transformations and inserting into hive*/
}
}
ssc
}
def main(args: Array[String]) {
if (args.length < 3) {
  System.err.println("Usage: streaming <brokers> <topics> <checkpointDirectory>")
  System.exit(1)
}
val Array(brokers,topics,checkpointDirectory) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,()=>createContext(brokers,topics,checkpointDirectory))
ssc.start()
ssc.awaitTermination()
 }

Upvotes: 1

Views: 1334

Answers (2)

1580923067
1580923067

Reputation: 29

if(!rdd.isEmpty()) {
    import sqlContext.implicits._
    val rdd2 = rdd.map(x => new JsonDeserializer().deserialize("pi_adj",x))        
    val rdd3 = rdd2.map(x => new String(x,"UTF-8"))
    val df1 = sqlContext.read.json(rdd3)
  /*some other transformations and inserting into hive*/
}
if(!rdd.isEmpty()) {
    val spark = 
      SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
    import spark.implicits._
    val rdd2 = rdd.map(x => new JsonDeserializer().deserialize("pi_adj",x))        
    val rdd3 = rdd2.map(x => new String(x,"UTF-8"))
    val df1 = spark.read.json(rdd3)
  /*some other transformations and inserting into hive*/
}

refer to this refer to this

Upvotes: 0

Jacek Laskowski
Jacek Laskowski

Reputation: 74679

tl;dr Move the code to create a Kafka DStream and foreach outside createContext and use it in main.

According to the scaladoc of StreamingContext:

Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the provided checkpointPath, then StreamingContext will be recreated from the checkpoint data. If the data does not exist, then the StreamingContext will be created by called the provided creatingFunc.

And although it may not have been said clearly, creatingFunc to create a StreamingContext should only create a new StreamingContext possibly with checkpoint enabled. Nothing else.

You should move the code to create Kafka DStream and foreachRDD outside createContext and have it as part of main (right after ssc is initialized and before starting it).

Upvotes: 1

Related Questions