Reputation: 493
This is in continuation of this how to save dataframe into csv pyspark thread.
I'm trying to save my pyspark data frame df in my pyspark 3.0.1. So I wrote
df.coalesce(1).write.csv('mypath/df.csv)
But after executing this, I'm seeing a folder named df.csv in mypath which contains 4 following files
1._committed_..
2._started_...
3._Success
4. part-00000-.. .csv
Can you suggest to me how do I save all data in df.csv
?
Upvotes: 6
Views: 17056
Reputation: 928
I had the same issue and ended up using this function:
def WriteCsvToLocation(dataframe, location, filename):
dataframe.coalesce(1).write.option("header","true").option("escape", "\"").mode("overwrite").format("csv").save(location + "tmp")
# write multi-part file to temp location, then move .csv to desired location.
fileNames = dbutils.fs.ls(location + "tmp")
name = ''
for fileName in fileNames:
if fileName.name.endswith('.csv'):
name = fileName.name
dbutils.fs.cp(location + "tmp/" + name, location + filename + ".csv")
dbutils.fs.rm(location + "tmp", recurse = True)
You can check out this post for more details.
Upvotes: 0
Reputation: 605
In pandas.DataFrame.to_csv
when the path is None
, the function returns the csv as a string. You can write this string directly to any file on the dbfs with dbutils.fs.put
.
csv_buffer = df.toPandas().to_csv(sep=';', header=True, index=False)
dbutils.fs.put('mypath/df.csv', csv_buffer, overwrite=True)
I guess this only works if all data fits into the driver node's memory.
Upvotes: 0
Reputation: 2416
You can use .coalesce(1)
to save the file in just 1 csv partition, then rename this csv and move it to the desired folder.
Here is a function that does that:
df
: Your df
fileName
: Name you want to for the csv file
filePath
: Folder where you want to save to
def export_csv(df, fileName, filePath):
filePathDestTemp = filePath + ".dir/"
df\
.coalesce(1)\
.write\
.csv(filePathDestTemp) # use .csv to save as csv
listFiles = dbutils.fs.ls(filePathDestTemp)
for subFiles in listFiles:
if subFiles.name[-4:] == ".csv":
dbutils.fs.cp (filePathDestTemp + subFiles.name, filePath + fileName+ '.csv')
dbutils.fs.rm(filePathDestTemp, recurse=True)
Upvotes: 5
Reputation: 32640
If you want to get one file named df.csv
as output, you can first write into a temporary folder, then move the part file generated by Spark and rename it.
These steps can be done using Hadoop FileSystem API available via JVM gateway :
temp_path = "mypath/__temp"
target_path = "mypath/df.csv"
df.coalesce(1).write.mode("overwrite").csv(temp_path)
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(sc._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()
# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)
Upvotes: 2