Fault tolerant for Kafka Direct Stream do not work. Checkpoint directory does not exist

I Write app for read data from Kafka topic. And I can’t achieve fault tolerance in the event of a driver failure. The application runs in a k8s cluster using spark submit. When I run my application for the first time, everything goes well, but when I delete pod from the cluster, restarting the application results in an error. Checkpoint directory does not exist: file:/opt/spark/.../rdd-541. I use fault-tolerant storage. Below is a piece of code and an error in more detail. Thanks for the help. Let me know if few details.

def functionToCreateContext():
    sc = SparkContext("spark-master", "kafka_spark")
    sc.setLogLevel('ERROR')
    ssc = StreamingContext(sc, 20)
    kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
    kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
    statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y, 
                     lambda x, y: x - y,60, 20)
    top = statistic_window.transform(found)
    top.pprint()
    ssc.checkpoint(cpd)
    return ssc

if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
    ssc.start()
    ssc.awaitTermination()

traceback:

20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
  File "/app-spark/kafka_spark.py", line 75, in <module>
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
        at scala.Predef$.require(Predef.scala:224)

Upvotes: 1

Views: 196

Answers (1)

alpert
alpert

Reputation: 4655

I guess you are getting this error because your checkpoint directory is not persistent volume. So when you delete the pod, specified directory is also deleted and you are getting that Checkpoint directory does not exist error.

The solution would be to use Persistent Volume to checkpoint directory.

Here you can also find an example in exactly same topic.

Upvotes: 1

Related Questions