Reputation: 101
I have a Spark's Structured Streaming application with checkpointing to write output in parquet and using the default spark.sql.shuffle.partitions = 200. I need to change the shuffle partitions but the new value is not used. Here is the content of a checkpoint offset after the application is restarted:
{"batchWatermarkMs":1520054221000,"batchTimestampMs":1520054720003,"conf":{"spark.sql.shuffle.partitions":"200"}}
Do I need to set the number of partitions in the code instead of setting it with --conf?
Upvotes: 5
Views: 1761
Reputation: 600
If you need to restore your application from the point where you had stopped along with changing the shuffle partitions size then you have two options
Use starting offset option in spark configurations
Open the latest offset file make note of the offsets and Delete the checkpoint directory Ex - In this example there are 10 partitions from source topic
"startingOffsets": "{"topicAName":{"9":2528,"7":2633,"0":2705,"3":2018,"5":2749,"6":2857,"2":2145,"8":1916,"1":2094,"4":1932}}"
Pass the above config in spark configurations
Modify checkpoint all files (Works only if you don't have any state in your application, not recommended unless you are very sure and there is no other way)
Modify the offset file to have the latest offset file only and modify the configs in it
"spark.sql.shuffle.partitions":"200" to value which you choose
Upvotes: 0
Reputation: 149636
The number is restored from checkpoint, it will only change if you delete the checkpointed data and restart it with a "clean slate".
This makes sense, because if you have checkpointed data, Spark needs to know from how many partition directories it needs to restore the previous state.
Upvotes: 8