Reputation: 266
# Creating PySpark Object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("XMLParser").getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", aws_key)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", aws_secret)
Then I am able to read the file using following code from my s3 bucket
df = spark.read.format("xml").options(rootTag='returnResult', rowTag="query").load("s3n://bucketName/folder/file.xml")
But when I tried to write back to s3 using delta lake (parquet file) using this code
df.write.format("delta").mode('overwrite').save("s3n://bucket/folder/file")
I am getting this error
Py4JJavaError: An error occurred while calling o778.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See https://docs.delta.io/latest/delta-storage.html " for details.
at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:157)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:434)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.doCommit(OptimisticTransaction.scala:415)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:326)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:284)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:67)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3n.impl=null: No AbstractFileSystem configured for scheme: s3n
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
... 53 more
I tried to follow the link given in the stacktrace, but not able to figure out how can i resolve this. Any help would be appericiated
Upvotes: 5
Views: 17930
Reputation: 905
Using s3a works for me.
df.write.format("delta").mode('overwrite').save("s3a://bucket/folder/file")
Upvotes: 0
Reputation: 11
I found that did not need to do anything special to the S3 location. I simply needed to write out delta format files to a folder in S3 that didn't already exist. If it already exists and has objects in it, I get the same error as OP.
Here is my spark session creation code:
spark = glueContext.spark_session.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
Upvotes: 0
Reputation: 5526
After creating the spark session, you need to add configuration provided by databricks for enabling s3 as delta store like:
conf = spark.sparkContext._conf.setAll([('spark.delta.logStore.class','org.apache.spark.sql.delta.storage.S3SingleDriverLogStore')])
spark.sparkContext._conf.getAll()
As the name suggests, the S3SingleDriverLogStore implementation only works properly when all concurrent writes originate from a single Spark driver. This is an application property, must be set before starting SparkContext, and cannot change during the lifetime of the context.
From Databricks visit here for configuring s3a path access key and secret key.
Upvotes: 8