PyRsquared
PyRsquared

Reputation: 7338

How to export a table dataframe in PySpark to csv?

I am using Spark 1.3.1 (PySpark) and I have generated a table using a SQL query. I now have an object that is a DataFrame. I want to export this DataFrame object (I have called it "table") to a csv file so I can manipulate it and plot the columns. How do I export the DataFrame "table" to a csv file?

Upvotes: 125

Views: 471605

Answers (9)

zero323
zero323

Reputation: 330093

If data frame fits in a driver memory and you want to save to local files system you can convert Spark DataFrame to local Pandas DataFrame using toPandas method and then simply use to_csv:

df.toPandas().to_csv('mycsv.csv')

Otherwise you can use spark-csv:

  • Spark 1.3

      df.save('mycsv.csv', 'com.databricks.spark.csv')
    
  • Spark 1.4+

      df.write.format('com.databricks.spark.csv').save('mycsv.csv')
    

In Spark 2.0+ you can use csv data source directly:

df.write.csv('mycsv.csv')

Upvotes: 266

s510
s510

Reputation: 2822

Using PySpark

Easiest way to write in csv in Spark 3.0+

sdf.write.csv("/path/to/csv/data.csv")

this can generate multiple files based on the number of spark nodes you are using. In case you want to get it in a single file use repartition.

sdf.repartition(1).write.csv("/path/to/csv/data.csv")

-or the more efficient option-

sdf.coalesce(1).write.csv("/path/to/csv/data.csv")

Using Pandas

If your data is not too much and can be held in the local python, then you can make use of pandas too

sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)

Using Koalas

sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)

Upvotes: 11

dylanvanw
dylanvanw

Reputation: 3341

I used the method with pandas and this gave me horrible performance. In the end it took so long that I stopped to look for another method.

If you are looking for a way to write to one csv instead of multiple csv's this would be what you are looking for:

df.coalesce(1).write.csv("train_dataset_processed", header=True)

It reduced processing my dataset from 2+ hours to 2 minutes

Upvotes: 1

Matei Florescu
Matei Florescu

Reputation: 1195

How about this (in case you don't want a one liner) ?

for row in df.collect():
    d = row.asDict()
    s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
    f.write(s)

f is an opened file descriptor. Also the separator is a TAB char, but it's easy to change to whatever you want.

Upvotes: 4

Megha Jaiswal
Megha Jaiswal

Reputation: 600

try display(df) and use the download option in the results. Please note: only 1 million rows can be downloaded with this option but its really quick.

Upvotes: 0

Aku
Aku

Reputation: 802

'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''

import shutil
import os
import glob

path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)

#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\\' + path + '\\' + r'*.csv')[0], os.getcwd()+ '\\' + path+ '.csv')

#remove additional directory
shutil.rmtree(os.getcwd()+'\\'+path)

Upvotes: 2

Gazal Patel
Gazal Patel

Reputation: 480

You need to repartition the Dataframe in a single partition and then define the format, path and other parameter to the file in Unix file system format and here you go,

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')

Read more about the repartition function Read more about the save function

However, repartition is a costly function and toPandas() is worst. Try using .coalesce(1) instead of .repartition(1) in previous syntax for better performance.

Read more on repartition vs coalesce functions.

Upvotes: 12

Hafiz Muhammad Shafiq
Hafiz Muhammad Shafiq

Reputation: 8670

For Apache Spark 2+, in order to save dataframe into single csv file. Use following command

query.repartition(1).write.csv("cc_out.csv", sep='|')

Here 1 indicate that I need one partition of csv only. you can change it according to your requirements.

Upvotes: 47

jbochi
jbochi

Reputation: 29634

If you cannot use spark-csv, you can do the following:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")

If you need to handle strings with linebreaks or comma that will not work. Use this:

import csv
import cStringIO

def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")

Upvotes: 21

Related Questions