Reputation: 115
I am trying to save a data frame as a text file, however, I am getting a File Already Exists exception. I tried adding the mode to the code but to no avail. Furthermore, the file does not actually exists. Would anyone have an idea how I can solve this problem? I am using PySpark
This is the code:
distFile = sc.textFile("/Users/jeremy/Downloads/sample2.nq")
mapper = distFile.map(lambda q: __q2v(q))
reducer = mapper.reduceByKey(lambda a, b: a + os.linesep + b)
data_frame = reducer.toDF(["context", "triples"])
data_frame.coalesce(1).write.partitionBy("context").text("/Users/jeremy/Desktop/so")
May I add that the exception is being raised after some time and that some data is actually stored in temporary files (which are obviously deleted).
Thanks!
Edit: Exception can be found here: https://gist.github.com/jerdeb/c30f65dc632fb997af289dac4d40c743
Upvotes: 6
Views: 10346
Reputation: 317
You should check your executors and look at the logs of the ones that are failing.
In my case, I had a coalesce(1)
on a large DF. 4 of my executors failed - 3 of them had the same error of org.apache.hadoop.fs.FileAlreadyExistsException: File already exists
.
However, 1 of them had a different exception: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 262144 bytes of memory, got 148328
I was able to fix it by increasing the executor memory so that the coalesce did not cause an out of memory
error.
Upvotes: 1
Reputation: 128
I had the same problem and was able get around it with this:
outputDir = "/FileStore/tables/my_result/"
dbutils.fs.rm(outputDir , True)
Just change the outputDir variable to whatever directory you are writing to.
Upvotes: 0
Reputation: 3696
you can used overwrite
or append
for replacing the file or adding the data into same file.
data_frame.coalesce(1).write.mode('overwrite').partitionBy("context").text("/Users/jeremy/Desktop/so")
or
data_frame.coalesce(1).write.mode('append').partitionBy("context").text("/Users/jeremy/Desktop/so")
Upvotes: 3