Reputation: 304
When I load data into pyspark dataframe from s3 bucket then make some manipulations (join, union) and then I try to overwrite the same path ('data/csv/') I read before. I'm getting this error:
py4j.protocol.Py4JJavaError: An error occurred while calling
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at py4j.reflection.MethodInvoker.invoke(
at py4j.reflection.ReflectionEngine.invoke(
at py4j.Gateway.invoke(
at py4j.commands.AbstractCommand.invokeMethod(
at py4j.commands.CallCommand.execute(
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 in stage 120.0 failed 4 times, most recent failure: Lost task 200.3 in stage 120.0: Key 'data/csv/part-00000-68ea927d-1451-4a84-acc7-b91e94d0c6a3-c000.csv' does not exist in S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
csv_a = spark \
.read \
.format('csv') \
.option("header", "true") \
.load('s3n://mybucket/data/csv') \
.where('some condition')
csv_b = spark \
.read \
.format('csv') \
.option("header", "true") \
# Reading glue categories data
cc = spark \
.sql("select * from mydatabase.mytable where month='06'") \
# Joining and Union
output = csv_b \
.join(cc, (csv_b.key == cc.key), 'inner') \
.select('csv.key', 'csv.created_ts', '', 'csv.text') \
.drop_duplicates(['key']) \
.union(csv_a) \
.orderBy('name') \
.coalesce(1) \
.write \
.format('csv') \
.option('header', 'true') \
.mode('overwrite') \
I need to read data from s3 location, then, join, union with another data and finally overwrite initial path to keep only one csv file with clean joined data.
If I try to read (load) data from another s3 path not the same as I need to overwrite, it works and overwrites ok.
Any ideas why does this error happen?
Upvotes: 0
Views: 3605
Reputation: 304
When you are reading data from a folder, modifying it and saving on top of the data which you have initially read, spark tries to overwrite the same key on s3 (file on hdfs) etc...
I've found 2 options:
Resolved by adding .persist(StorageLevel.MEMORY_AND_DISK)
output = csv_b \
.join(cc, (csv_b.key == cc.key), 'inner') \
.select('csv.key', 'csv.created_ts', '', 'csv.text') \
.drop_duplicates(['key']) \
.union(csv_a) \
.orderBy('name') \
.coalesce(1) \
.persist(StorageLevel.MEMORY_AND_DISK) \
.write \
.format('csv') \
.option('header', 'true') \
.mode('overwrite') \
Upvotes: 0