Reputation: 81
We're evaluating AWS Glue for a big data project, with some ETL. We added a crawler, which is correctly picking up a CSV file from S3. Initially, we simply want to transform that CSV to JSON, and drop the file in another S3 location (same bucket, different path).
We used the script as provided by AWS (no custom script here). And just mapped all the columns.
The target folder is empty (job has been just created), but the job fails with "File already exists": snapshot here. The S3 location were we pretend to drop the output was empty before starting the job. However after the error we do see two files, but those seems to be partials: snapshot
Any ideas on what might be going on?
Here's the fully stack:
Container: container_1513099821372_0007_01_000001 on ip-172-31-49-38.ec2.internal_8041 LogType:stdout Log Upload Time:Tue Dec 12 19:12:04 +0000 2017 LogLength:8462 Log Contents: Traceback (most recent call last): File "script_2017-12-12-19-11-08.py", line 30, in datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = { "path": "s3://primero-viz/output/tcw_entries" } , format = "json", transformation_ctx = "datasink2") File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 523, in from_options File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 175, in write_dynamic_frame_from_options File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 198, in write_from_options File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 32, in write File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o86.pyWriteDynamicFrame. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-63-141.ec2.internal, executor 1): java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:604) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:176) at com.amazonaws.services.glue.hadoop.TapeOutputFormat.getRecordWriter(TapeOutputFormat.scala:65) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996) at com.amazonaws.services.glue.HadoopDataSink$$anonfun$2.apply$mcV$sp(DataSink.scala:192) at com.amazonaws.services.glue.HadoopDataSink.writeDynamicFrame(DataSink.scala:202) at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000
Upvotes: 7
Views: 4203
Reputation: 50
For me, a similar error message turned out to be unrelated to the file already existing. The error message can be a little misleading. There was a problem in a previous stage (in my case, I was reading data from a MySQL database in a previous stage and the source DB contained an invalid date, which resulted in partial data being written and the task crashing).
I would suggest checking the other stages leading up to this write.
See also this other StackOverflow answer.
Upvotes: 0
Reputation: 39
Setup the write mode to "append" whether your load is incremental or "overwrite" if it's full load.
One example could be:
events.toDF().write.json(events_dir, mode="append", partitionBy=["partition_0", "partition_1"])
Upvotes: -1
Reputation: 1
The target folder is empty
Empty is not the same as not exist. It doesn't look like write_dynamic_frame
supports write modes so might have to drop the directory first.
Upvotes: -2