JDoe
JDoe

Reputation: 493

How to save pyspark data frame in a single csv file

This is in continuation of this how to save dataframe into csv pyspark thread.

I'm trying to save my pyspark data frame df in my pyspark 3.0.1. So I wrote

df.coalesce(1).write.csv('mypath/df.csv)

But after executing this, I'm seeing a folder named df.csv in mypath which contains 4 following files

1._committed_..
2._started_...
3._Success  
4. part-00000-.. .csv

Can you suggest to me how do I save all data in df.csv?

Upvotes: 6

Views: 17056

Answers (4)

Thusi
Thusi

Reputation: 928

I had the same issue and ended up using this function:

def WriteCsvToLocation(dataframe, location, filename):

    dataframe.coalesce(1).write.option("header","true").option("escape", "\"").mode("overwrite").format("csv").save(location + "tmp")

    # write multi-part file to temp location, then move .csv to desired location. 
    fileNames = dbutils.fs.ls(location + "tmp")
    name = ''
    for fileName in fileNames:
        if fileName.name.endswith('.csv'):
            name = fileName.name

    dbutils.fs.cp(location + "tmp/" + name, location + filename + ".csv")
    dbutils.fs.rm(location + "tmp", recurse = True)

You can check out this post for more details.

Upvotes: 0

Michael H.
Michael H.

Reputation: 605

In pandas.DataFrame.to_csv when the path is None, the function returns the csv as a string. You can write this string directly to any file on the dbfs with dbutils.fs.put.

csv_buffer = df.toPandas().to_csv(sep=';', header=True, index=False)
dbutils.fs.put('mypath/df.csv', csv_buffer, overwrite=True)

I guess this only works if all data fits into the driver node's memory.

Upvotes: 0

Luiz Viola
Luiz Viola

Reputation: 2416

You can use .coalesce(1) to save the file in just 1 csv partition, then rename this csv and move it to the desired folder.

Here is a function that does that:

df: Your df
fileName: Name you want to for the csv file
filePath: Folder where you want to save to

def export_csv(df, fileName, filePath):
  
  filePathDestTemp = filePath + ".dir/" 

  df\
    .coalesce(1)\
    .write\
    .csv(filePathDestTemp) # use .csv to save as csv

  listFiles = dbutils.fs.ls(filePathDestTemp)
  for subFiles in listFiles:
    if subFiles.name[-4:] == ".csv":
      
      dbutils.fs.cp (filePathDestTemp + subFiles.name,  filePath + fileName+ '.csv')

  dbutils.fs.rm(filePathDestTemp, recurse=True)

Upvotes: 5

blackbishop
blackbishop

Reputation: 32640

If you want to get one file named df.csv as output, you can first write into a temporary folder, then move the part file generated by Spark and rename it.

These steps can be done using Hadoop FileSystem API available via JVM gateway :

temp_path = "mypath/__temp"
target_path = "mypath/df.csv"

df.coalesce(1).write.mode("overwrite").csv(temp_path)

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path

# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(sc._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()

# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)

Upvotes: 2

Related Questions