Alessio
Alessio

Reputation: 31

Failure when creating a table during job run on Databricks

I have been doing a left join on two tables in SQL (table A contains billions of rows and table B contains millions of rows) and creating a new table (table C) from the result of the join. I am using a XXL SQL Warehouse on Databricks with two workers to handle the compute of this operation.

The main issue is when I schedule this job to be performed it seems to fail every time. When I run it manually, the job finishes almost every time.

DROP TABLE IF EXISTS tableC;
CREATE TABLE tableC
AS (
    SELECT * 
    FROM tableA
    LEFT JOIN tableB
    ON tableA.id = tableB.id
    )

After running the SQL query as a scheduled job, I seem to get the error below. From my understanding the error occurs in the final stage of the job when it is writing the rows into a delta table, however I can't seem to figure out the reason for this failure. I have checked to make sure the data is not corrupted and I have enough memory. The main part that puzzles me is the fact that it works when the SQL command is run manually but always seems to fail when the query is apart of a scheduled job run.

Job aborted due to stage failure: ResultStage 9 (write at WriteIntoDeltaCommand.scala:70) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException
at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:315)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.$anonfun$next$1(ShuffleBlockFetcherIterator.scala:905)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:736)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.RowToColumnarExec$$anon$1.hasNext(Columnar.scala:491)
at com.databricks.photon.CloseableIterator$$anon$5.hasNext(CloseableIterator.scala:62)
at com.databricks.photon.NativeColumnBatchIterator.hasNext(NativeColumnBatchIterator.java:47)
at 0x8235362 <photon>.HasNext(external/workspace_spark_3_3/photon/jni-wrappers/jni-native-column-batch-iterator.cc:62)
at 0x4707a79 <photon>.OpenImpl(external/workspace_spark_3_3/photon/exec-nodes/file-writer-node.cc:161)
at com.databricks.photon.JniApiImpl.open(Native Method)
at com.databricks.photon.JniApi.open(JniApi.scala)
at com.databricks.photon.JniExecNode.open(JniExecNode.java:64)
at com.databricks.photon.PhotonWriteStageExec.$anonfun$executeWrite$5(PhotonWriteStageExec.scala:87)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.photon.PhotonExec.timeit(PhotonExec.scala:300)
at com.databricks.photon.PhotonExec.timeit$(PhotonExec.scala:298)
at com.databricks.photon.PhotonWriteStageExec.timeit(PhotonWriteStageExec.scala:38)
at com.databricks.photon.PhotonWriteStageExec.$anonfun$executeWrite$4(PhotonWriteStageExec.scala:87)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1730)
at com.databricks.photon.PhotonWriteStageExec.$anonfun$executeWrite$2(PhotonWriteStageExec.scala:84)
at com.databricks.photon.PhotonExec.$anonfun$executePhoton$2(PhotonExec.scala:484)
at com.databricks.photon.PhotonExec.$anonfun$executePhoton$2$adapted(PhotonExec.scala:353)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:882)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:882)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:62)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:223)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more

Upvotes: 3

Views: 773

Answers (1)

BARATH
BARATH

Reputation: 33

Based on stacktrace the issue seems to be for two reasons

org.apache.spark.shuffle.FetchFailedException    (operation failed) 
Caused by: java.nio.channels.ClosedChannelException   ( root cause )

The problem possibly is the default retry configuration / or / timeout configuration is not suitable for complex workloads. Checkout this article and try the recommendations mainly under 'Network Timeout': section https://towardsdatascience.com/fetch-failed-exception-in-apache-spark-decrypting-the-most-common-causes-b8dff21075c

Upvotes: 2

Related Questions