Tim
Tim

Reputation: 1471

Azure Databricks pyspark readstream reads non orc files from the mounted ADLS Gen2 input path

I am using Databricks pyspark readstream and write stream like below.

I am reading the orc file from the mounted ADLS Gen2 Azure store, Below is the fragment of the code, where the files is read from a location.

Say, input data is loaded by a process

my-storage-account/my-container/table1/input/
             |
              - loadDate_20220621/
             |          |- file1.orc
             |          |- file2.orc
             |_ sample.json
             |_ file.json 
domains = [ 'table1','table2']

for domain in domains:
   with open('{}/{}.json'.format(mount_path, schema_name), 'r') as jfile:
      table_schema = T.StructType.fromJson(json.loads(jfile.read()))
   )
   data_schema[domain] = table_schema

for domain in domains:
  #read stream 
  data_readstream = ( 
    spark.readStream
    .format("orc")    # READING ORC FORMAT
    .schema(data_schema[domain])
    .load(f'{mount_path}/input/data/')
  )

## other operattion to perform merge data
## create merge upsert function and store in the list

 def mergeTbl1(df, batch_id): 
  data_df = (
    df
    .dropDuplicates(['field1'])
  ) 
  data_df.createOrReplaceTempView("table1_temp")
 
  data_df._jdf.sparkSession().sql("""
    MERGE INTO my_db.table1 tbl1
    USING table1_temp tbl1u
    ON tbl1.field1 = tbl1u.field1
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
    """)

def mergeTbl2(df, batch_id): 
  data_df = (
    df
    .dropDuplicates(['field2'])
  ) 
  data_df.createOrReplaceTempView("table1_temp")
 
  data_df._jdf.sparkSession().sql("""
    MERGE INTO my_db.table2 tbl2
    USING table2_temp tbl2u
    ON tbl2.field2 = tbl2u.field2
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
    """)

mergeUpsertFunctionList['table1] = mergeTbl1
mergeUpsertFunctionList['table2] = mergeTbl2


for domain in domains:
  # write stream
  data_writestream = ( 
    data_readstreams[domain]
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", f"{mount_path}/checkpoints/{domain}")
    .foreachBatch(mergeUpsertFunctionList[domain])
    .start()
  )

Below is the exception i get, which is strange since with ORC format in readstream, expect spark to read only ORC formats not json,

Question:

Any reason why this is happening?

 File "/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 202, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 199, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "<command-313423169960053>", line 15, in mergeDataTable1
    df
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 740, in save
    self._jwrite.save(path)
  File "/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in deco
    return f(*a, **kw)
  File "/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o855.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:606)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:360)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:198)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:213)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:360)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:160)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:310)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 10) (10.44.216.76 executor 0): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt_path/input............../file.json.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:521)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:494)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:614)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$hasNext$1(FileScanRDD.scala:356)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:351)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:1017)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:826)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:829)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:684)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.orc.FileFormatException: Malformed ORC file dbfs:/mount_path/input....../.../sample.json. Invalid postscript.
    at org.apache.orc.impl.ReaderImpl.ensureOrcFooter(ReaderImpl.java:462)
    at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:795)
    at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:564)
    at org.apache.orc.OrcFile.createReader(OrcFile.java:385)
    at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$2(OrcFileFormat.scala:146)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:3035)
    at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:146)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:457)

Upvotes: 1

Views: 407

Answers (1)

Alex Ott
Alex Ott

Reputation: 87069

There are few approaches here:

data_readstream = ( 
    spark.readStream
    .format("orc")    # READING ORC FORMAT
    .schema(data_schema[domain])
    .option("pathGlobFilter", "*.orc")
    .load(f'{mount_path}/input/data/')
  )
  • Ignore files considered corrupted using the spark.sql.files.ignoreCorruptFiles Spark configuration (doc), althought this is not an optimal solution.

Upvotes: 1

Related Questions