user1373186
user1373186

Reputation: 101

new spark.sql.shuffle.partitions value not used after checkpointing

I have a Spark's Structured Streaming application with checkpointing to write output in parquet and using the default spark.sql.shuffle.partitions = 200. I need to change the shuffle partitions but the new value is not used. Here is the content of a checkpoint offset after the application is restarted:

{"batchWatermarkMs":1520054221000,"batchTimestampMs":1520054720003,"conf":{"spark.sql.shuffle.partitions":"200"}}

Do I need to set the number of partitions in the code instead of setting it with --conf?

Upvotes: 5

Views: 1761

Answers (2)

Vinay KV
Vinay KV

Reputation: 600

If you need to restore your application from the point where you had stopped along with changing the shuffle partitions size then you have two options

Use starting offset option in spark configurations

  1. Open the latest offset file make note of the offsets and Delete the checkpoint directory Ex - In this example there are 10 partitions from source topic

    "startingOffsets": "{"topicAName":{"9":2528,"7":2633,"0":2705,"3":2018,"5":2749,"6":2857,"2":2145,"8":1916,"1":2094,"4":1932}}"

  2. Pass the above config in spark configurations

Modify checkpoint all files (Works only if you don't have any state in your application, not recommended unless you are very sure and there is no other way)

  1. Modify the offset file to have the latest offset file only and modify the configs in it

    "spark.sql.shuffle.partitions":"200" to value which you choose

Upvotes: 0

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149636

The number is restored from checkpoint, it will only change if you delete the checkpointed data and restart it with a "clean slate".

This makes sense, because if you have checkpointed data, Spark needs to know from how many partition directories it needs to restore the previous state.

Upvotes: 8

Related Questions