Reputation: 65
When I stream data from a Delta table hosted on Azure Datalake Storage (ADLS) Gen2, the stream works for a little bit before failing with the error below. The error says that the path doesn't exist, but I can see in the storage logs that files are successfully being written and read from that path before and after the error. It seems safe to say that the path does exist in Azure Storage, despite the exception.
For context:
ForeachBatch
sink.Fixes I've tried:
None
to 10 seconds
. After this, the query went from failing after ~15 minutes with the error below to failing after a little over an hour.I found one other person with this error, but no solution was provided: https://github.com/delta-io/delta/issues/932 since it was asked to the wrong audience. It seems that a simple repro can be made by reading and writing Spark streams to a delta table hosted on ADLS gen2, based on their Issue.
How can I pin down the root cause? Are there any Spark or ADLS settings I can change to mitigate this?
22/03/19 02:06:20 ERROR MicroBatchExecution: Query [id = 00f1d866-74a2-42f9-8fb6-c8d1a76e00a6, runId = 902f8480-4dc6-4a7d-aada-bfe3b660d288] terminated with error
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:408)
at org.apache.spark.sql.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.scala:69)
at org.apache.spark.sql.delta.DeltaLog.getChanges(DeltaLog.scala:227)
at org.apache.spark.sql.delta.sources.DeltaSource.filterAndIndexDeltaLogs$1(DeltaSource.scala:190)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChanges(DeltaSource.scala:203)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame(DeltaSource.scala:117)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame$(DeltaSource.scala:112)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChangesAndCreateDataFrame(DeltaSource.scala:144)
at org.apache.spark.sql.delta.sources.DeltaSource.getBatch(DeltaSource.scala:385)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:486)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:207)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:231)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:905)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:876)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:858)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:405)
... 37 more
Upvotes: 0
Views: 1081
Reputation: 10859
Please check with below points:
Otherwise the problem cause can be Having multiple jobs writing to the same cluster, and one is cleaning up while the other is setting up and getting mixed up
Note
- To avoid the error to some extent please make sure your jobs are not writing to the same table simultaneously.
- Work with the most recent version of spark you can work with.
References:
Upvotes: 0