Reputation: 11
I Write app for read data from Kafka topic. And I can’t achieve fault tolerance in the event of a driver failure. The application runs in a k8s cluster using spark submit. When I run my application for the first time, everything goes well, but when I delete pod from the cluster, restarting the application results in an error. Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
. I use fault-tolerant storage. Below is a piece of code and an error in more detail. Thanks for the help. Let me know if few details.
def functionToCreateContext():
sc = SparkContext("spark-master", "kafka_spark")
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 20)
kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y,
lambda x, y: x - y,60, 20)
top = statistic_window.transform(found)
top.pprint()
ssc.checkpoint(cpd)
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
ssc.start()
ssc.awaitTermination()
traceback:
20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
File "/app-spark/kafka_spark.py", line 75, in <module>
ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
at scala.Predef$.require(Predef.scala:224)
Upvotes: 1
Views: 196
Reputation: 4655
I guess you are getting this error because your checkpoint directory is not persistent volume. So when you delete the pod, specified directory is also deleted and you are getting that Checkpoint directory does not exist
error.
The solution would be to use Persistent Volume to checkpoint directory.
Here you can also find an example in exactly same topic.
Upvotes: 1