DaveP
DaveP

Reputation: 298

pyspark df.write fails on file creation

When writing a dataframe, pyspark creates the directory, creates a temporary dir that directory, but no files. csv & parquet formats return similar errors. It fails with:

```Py4JJavaError: An error occurred while calling o99.save: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
    at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$20(FileFormatWriter.scala:240)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
    ... 41 more```

My Code:

```
output_path = ".data/output/FireIncidents/"


# fire_df.write.format("parquet").mode("overwrite").save(output_path)# fails, makes folder but no files.  
# fire_df.write.parquet(output_path,  mode='overwrite')  # fails creates folder by no file. 
# fire_df.write.csv(output_path, mode='overwrite')  # fails, makes folder but no files. 
# fire_df.write.save(output_path,format='parquet')  # fails, makes folder but no files. 
# fire_df.write.format("parquet").saveAsTable("tab1")  # fails, makes folder but no files. 
# fire_df.select("incident_number","call_datetime","incident_address").write.save("namesAndFavColors.parquet") # from the documentation, same effect
df_writer = pyspark.sql.DataFrameWriter(fire_df)
df_writer = df_writer.saveAsTable('test', format='parquet', mode='overwrite',path=output_path)

type(fire_df) ==pyspark.sql.dataframe.DataFrame

fire_df.select('incident_number', 'call_datetime','incident_address').show()
```+---------------+--------------------+--------------------+
|incident_number|       call_datetime|    incident_address|
+---------------+--------------------+--------------------+
|     18-0032836|2018/09/06 12:13:...|  16888 RIVERVIEW ST|
|     19-0019239|2019/06/03 06:46:...| 18469 GREENVIEW AVE|
|     20-0010724|2020/04/05 10:44:...|        2930 CODY ST|
```
etc. 

Docs: <https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html


I've tried all the variations above, multiple formats, more that one version of Hadoop,

HADOOP_HOME== "c:\hadoop"

hadoop 3.2.1 and or 3.2.2 (tried both) pyspark 3.2.0

Similar SO question, without resolution. pyspark creates output file as folder (note the comment where the requestor notes that created dir is empty.)

Upvotes: 1

Views: 6050

Answers (2)

Granger
Granger

Reputation: 4439

The fact that you are not getting the java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. error tells me that you have your HADOOP_HOME environment variable setup.

If the HADOOP_HOME directory does not have a "bin" subdirectory that contains the winutils.exe and hadoop.dll files, IIRC, you get a different error.

Tangent/FYI: When I was trying to figure out how to get those 2 Windows binaries, I tried these 3 locations:

  1. https://github.com/steveloughran/winutils/ (up through v3.0.0)
  2. https://github.com/cdarlint/winutils/ (the new spot; I'm on PySpark 3.2.1 and I picked the 3.2.2 build (the latest as of 2022-01-29).)
  3. https://github.com/kontext-tech/winutils/ (this had more recent builds, but I didn't end up using them)

So based on the above, I expect that what you're lacking is the $env:HADOOP_HOME\bin folder being in your PATH. I ran this at an elevated Powershell prompt, then restarted my environment (VsCode). After that, I was able to save Parquet files as expected.

[Environment]::SetEnvironmentVariable("PATH", "$env:PATH;$env:HADOOP_HOME\bin", "Machine")

And one last bit of context, here's the Python code I was testing with; the .csv was nothing special.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.read.options(header=True).csv("baseline-test.csv")
sdf.write.mode("overwrite").parquet("baseline-test-parquet")

Upvotes: 2

Egor
Egor

Reputation: 1409

It looks like windows native IO libraries is absent.

Hadoop requires native libraries on Windows to work properly -that includes to access the file:// filesystem, where Hadoop uses some Windows APIs to implement posix-like file access permissions.

This is implemented in HADOOP.DLL and WINUTILS.EXE

In particular, %HADOOP_HOME%\BIN\WINUTILS.EXE must be locatable.

If it is not, Hadoop or an application built on top of Hadoop will fail.

See https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems

And winutils repo

Upvotes: 3

Related Questions