Amanpreet Khurana
Amanpreet Khurana

Reputation: 559

Spark not able to find checkpointed data in HDFS after executor fails

I am sreaming data from Kafka as below:

final JavaPairDStream<String, Row> transformedMessages = 


    rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                    --logic goes here
                    }); 

I have four workers threads, and multiple executors for this application, and i am trying to check fault tolerance of Spark.

Since we are using mapWithState, spark is checkpointing data to HDFS, so if any executor/worker goes down , we should be able to recover the lost data (data lost in the dead executor), and continue with leftover executors/workers.

So i kill one of the workers nodes to see if the application still runs smoothly, but instead i get an exception of FileNotFound in HDFS as below:

This is a bit odd, as Spark checkpointed data at sometime in HDFS, why is it not able to find it. Obviously HDFS is not deleting any data, so why this exception.

Or am i missing something here?

[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
                org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
                at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
                at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
                at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
                at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at scala.Option.map(Option.scala:146)
                at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)

Further Update: I found that the RDD that Spark is trying to find in HDFS is already deleted by "ReliableRDDCheckpointData" process and it created a new RDD for the checkpoint data. DAG is pointing to this old RDD somehow. Had there been any reference to this data, it shouldn't have been deleted.

Upvotes: 3

Views: 2763

Answers (1)

Amanpreet Khurana
Amanpreet Khurana

Reputation: 559

Consider this pipeline of transformation on a Spark stream:

rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                      if(counter ==1){
                       --convert RDD to Dataset, and register it as a SQL table names "InitialDataTable"
                      } else
                       --convert RDD to Dataset, and register it as a SQL table names "ActualDataTable"

                     
                    }); 

mapWithState is associated with automatic checkpointing of state data after every batch, so each rdd in the above forEachRdd block is checkpointed. While checkpointing, it overwrites the previous checkpoint (because obviously the latest state needs to stay in the checkpoint).

But let's say if the user is still using rdd number 1, as in my case I am registering the very 1st rdd as a different table, and every other rdd as a different table, then it should not be overwritten. (It's the same in java: if something is referring to an object reference, then that object will not be eligible for garbage collection).

Now, when I try to access the table InitialDataTable, obviously the rdd used to create this table is no more in memory, so it will go to HDFS to recover that from the checkpoint, and it will not find it there as well because it was overwritten by the very next rdd, and the spark application stops citing the reason.

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000

So to resolve this issue, I had to checkpoint the very first rdd explicitly.

Upvotes: 3

Related Questions