supernatural
supernatural

Reputation: 1197

Spark Structured Streaming - This query does not support recovering from checkpoint location

I am trying to do some experiment/test on the checkpoint for learning purposes.

But I am getting limited options to see the working of the internals. I am trying to read from socket.

val lines: DataFrame = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 12345)
  .load()

and do some state operations with it for which I need checkpointing

Q1. Using Checkpoint location as my local system, it is not able to read the checkpoint back and it gives an error

This query does not support recovering from checkpoint location. Delete src/testC/offsets to start over.;

It creates a new checkpoint running the query everytime. How to use my local system as checkpointing for testing/experimenting purposes?

(So I went for hdfs)

Q2. And when hdfs as checkpoint, it is creating checkpoint in my local system instead of hdfs, how to make it checkpoint to the hdfs ? (passed hdfs config btw)

df.writeStream
  .option("checkpointLocation","/mycheckLoc")
  .option("hdfs_url" -> "hdfs://localhost:9000/hdoop"),
  .option("web_hdfs_url" -> "webhdfs://localhost:9870/hdoop")

Q3. Do we need to provide checkpoint in every df.writeStream options, i.e. We can also pass in spark.sparkContext.setCheckpointDir(checkpointLocation) right?

Upvotes: 1

Views: 984

Answers (1)

Michael Heil
Michael Heil

Reputation: 18515

You are getting this error "This query does not support recovering from checkpoint location" because a socket readStream is not a re-playable source and hence does not allow any usage of checkpointing. You need to make sure not to use the option checkpointLocation at all in your writeStream.

Typically, you differentiate between local file system and an hdfs location by using either file:///path/to/dir or hdfs:///path/to/dir.

Make sure that you application user has all the rights to write and read these locations. Also, you may have changes the code base in which case the application cannot recover from the checkpoint files. You can read about the allowed and not allowed changes in a Structured Streaming job in the Structured Streaming Programming Guid on Recovery Semantics after Changes in a Streaming Query.

In order to make Spark aware of your HDFS you need to include two Hadoop configration files on Spark's classpath:

  • hdfs-site.xml which provides default behaviors for the HDFS client; and
  • core-site.xml which sets the default file system name.

Usually, they are stored in "/etc/hadoop/conf". To make these files visible to Spark, you need to set HADOOP_CONF_DIR in $SPARK_HOME/spark-env.sh to a location containing the configuration files.

[Source from the book "Spark - The definitive Guide"]

"Do we need to provide checkpoint in every df.writeStream options, i.e. We can also pass in spark.sparkContext.setCheckpointDir(checkpointLocation) right?"

Theroetically, you could set the checkpoint location centrally for all queries in your SQLContext but it is highly recommend to set a unique checkpoint location for every single Stream. The Databricks blog on Structured Streaming in Production says:

"This checkpoint location preserves all of the essential information that uniquely identifies a query. Hence, each query must have a different checkpoint location, and multiple queries should never have the same location.

"As a best practice, we recommend that you always specify the checkpointLocation option."

Upvotes: 1

Related Questions