Reputation: 1197
I am trying to do some experiment/test on the checkpoint for learning purposes.
But I am getting limited options to see the working of the internals. I am trying to read from socket.
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 12345)
.load()
and do some state operations with it for which I need checkpointing
Q1. Using Checkpoint location as my local system, it is not able to read the checkpoint back and it gives an error
This query does not support recovering from checkpoint location. Delete src/testC/offsets to start over.;
It creates a new checkpoint running the query everytime. How to use my local system as checkpointing for testing/experimenting purposes?
(So I went for hdfs)
Q2. And when hdfs as checkpoint, it is creating checkpoint in my local system instead of hdfs, how to make it checkpoint to the hdfs ? (passed hdfs config btw)
df.writeStream
.option("checkpointLocation","/mycheckLoc")
.option("hdfs_url" -> "hdfs://localhost:9000/hdoop"),
.option("web_hdfs_url" -> "webhdfs://localhost:9870/hdoop")
Q3. Do we need to provide checkpoint in every df.writeStream
options, i.e. We can also pass in spark.sparkContext.setCheckpointDir(checkpointLocation)
right?
Upvotes: 1
Views: 984
Reputation: 18515
You are getting this error "This query does not support recovering from checkpoint location" because a socket
readStream is not a re-playable source and hence does not allow any usage of checkpointing. You need to make sure not to use the option checkpointLocation
at all in your writeStream.
Typically, you differentiate between local file system and an hdfs location by using either file:///path/to/dir
or hdfs:///path/to/dir
.
Make sure that you application user has all the rights to write and read these locations. Also, you may have changes the code base in which case the application cannot recover from the checkpoint files. You can read about the allowed and not allowed changes in a Structured Streaming job in the Structured Streaming Programming Guid on Recovery Semantics after Changes in a Streaming Query.
In order to make Spark aware of your HDFS you need to include two Hadoop configration files on Spark's classpath:
Usually, they are stored in "/etc/hadoop/conf". To make these files visible to Spark, you need to set HADOOP_CONF_DIR
in $SPARK_HOME/spark-env.sh
to a location containing the configuration files.
[Source from the book "Spark - The definitive Guide"]
"Do we need to provide checkpoint in every
df.writeStream
options, i.e. We can also pass inspark.sparkContext.setCheckpointDir(checkpointLocation)
right?"
Theroetically, you could set the checkpoint location centrally for all queries in your SQLContext but it is highly recommend to set a unique checkpoint location for every single Stream. The Databricks blog on Structured Streaming in Production says:
"This checkpoint location preserves all of the essential information that uniquely identifies a query. Hence, each query must have a different checkpoint location, and multiple queries should never have the same location.
"As a best practice, we recommend that you always specify the checkpointLocation option."
Upvotes: 1