Amit Joshi
Amit Joshi

Reputation: 182

Once in a while Spark Structured Streaming write stream is getting IllegalStateException: Race while writing batch 4

I have multiple queries running on the same spark structured streaming session. The queries are writing parquet records to Google Bucket and checkpoint to Google Bucket.

val query1 = df1
        .select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
        .select("key","data.*")
        .writeStream.format("parquet").option("path", path).outputMode("append")
        .option("checkpointLocation", checkpoint_dir1)
        .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
        .queryName("query1").start()

val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .select("key","data.*")
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir2)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
            .queryName("query2").start()

Problem: Sometimes job fails with ava.lang.IllegalStateException: Race while writing batch 4

Logs:

Caused by: java.lang.IllegalStateException: Race while writing batch 4
    at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:67)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
    ... 20 more
20/07/24 19:40:15 INFO SparkContext: Invoking stop() from shutdown hook

Upvotes: 0

Views: 1372

Answers (1)

zsxwing
zsxwing

Reputation: 20826

This error is because there are two writers writing to the output path. The file streaming sink doesn't support multiple writers. It assumes there is only one writer writing to the path. Each query needs to use its own output directory.

Hence, in order to fix this, you can make each query use its own output directory. When reading back the data, you can load each output directory and union them.

You can also use a streaming sink that supports multiple concurrent writers, such as the Delta Lake library. It's also supported by Google Cloud: https://cloud.google.com/blog/products/data-analytics/getting-started-with-new-table-formats-on-dataproc . This link has instructions about how to use Delta Lake on Google Cloud. It doesn't mention the streaming case, but what you need to do is changing format("parquet") to format("delta") in your codes.

Upvotes: 2

Related Questions