inquisitiveProgrammer
inquisitiveProgrammer

Reputation: 992

Convert csv to parquet file using python

I am trying to convert a .csv file to a .parquet file.
The csv file (Temp.csv) has the following format

1,Jon,Doe,Denver

I am using the following python code to convert it into parquet

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import os

if __name__ == "__main__":
    sc = SparkContext(appName="CSV2Parquet")
    sqlContext = SQLContext(sc)

    schema = StructType([
            StructField("col1", IntegerType(), True),
            StructField("col2", StringType(), True),
            StructField("col3", StringType(), True),
            StructField("col4", StringType(), True)])
    dirname = os.path.dirname(os.path.abspath(__file__))
    csvfilename = os.path.join(dirname,'Temp.csv')    
    rdd = sc.textFile(csvfilename).map(lambda line: line.split(","))
    df = sqlContext.createDataFrame(rdd, schema)
    parquetfilename = os.path.join(dirname,'output.parquet')    
    df.write.mode('overwrite').parquet(parquetfilename)

The result is only a folder named, output.parquet and not a parquet file that I'm looking for, followed by the following error on the console.

CSV to Parquet Error

I have also tried running the following code to face a similar issue.

from pyspark.sql import SparkSession
import os

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# read csv
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')    
df = spark.read.csv(csvfilename)

# Displays the content of the DataFrame to stdout
df.show()
parquetfilename = os.path.join(dirname,'output.parquet')    
df.write.mode('overwrite').parquet(parquetfilename)

How to best do it? Using windows, python 2.7.

Upvotes: 56

Views: 142630

Answers (10)

Mayank Mishra
Mayank Mishra

Reputation: 1

import pyarrow.csv as csv

dataframe = csv.read_csv("file.csv")



pyarrow.parquet.write_table(dataframe,"dataframe.parquet")

Upvotes: 0

Yuriy Gavrilov
Yuriy Gavrilov

Reputation: 11

it helps for me.

import pandas as pd
df = pd.read_csv('example.csv', low_memory=False)
df.to_parquet('output.parquet', engine="fastparquet")

Upvotes: 1

Abdelhak
Abdelhak

Reputation: 294

You can use the pyspark library to convert a CSV file to a Parquet file. Here is an example of how you can do this:

rc = spark.read.csv('/path/file.csv', header=True)
rc.write.format("parquet").save('/path/file.parquet')

This code reads a CSV file and the convert it to a Parquet file.

Upvotes: 0

ns15
ns15

Reputation: 8704

Handling larger than memory CSV files

Below code converts CSV to Parquet without loading the whole csv file into the memory

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

new_schema = pa.schema([
    ('col1', pa.int64()),
    ('col2', pa.int64()),
    ('newcol', pa.int64())
])

csv_column_list = ['col1', 'col2']

with pq.ParquetWriter('my_parq_data.parquet', schema=new_schema) as writer:
    with pd.read_csv('my_data.csv', header=None, names=csv_column_list, chunksize=100000) as reader:
        for df in reader:
            # transformation: transform df by adding a new static column with column name 'newcol' and value 9999999
            df['newcol'] = 9999999
            # convert pandas df to record batch
            transformed_batch = pa.RecordBatch.from_pandas(df, schema=new_schema)
            writer.write_batch(transformed_batch)  

Above code:

  1. Reads the large CSV file in chunks.
  2. Transforms the data frame by adding the new column.
  3. Converts the df to arrow record batch.
  4. Writes the transformed arrow batch as a new row group to the parquet file.

Note: Do not keep the chunk size very low. This will result in poor compression since chunk size corresponds to the row group size in the new parquet file as well.

Upvotes: 8

Powers
Powers

Reputation: 19308

There are a few different ways to convert a CSV file to Parquet with Python.

Uwe L. Korn's Pandas approach works perfectly well.

Use Dask if you'd like to convert multiple CSV files to multiple Parquet / a single Parquet file. This will convert multiple CSV files into two Parquet files:

import dask.dataframe as dd

df = dd.read_csv('./data/people/*.csv')
df = df.repartition(npartitions=4)
df.to_parquet('./tmp/people_parquet4')

You could also use df.repartition(npartitions=1) if you'd only like to output one Parquet file. More info on converting CSVs to Parquet with Dask here.

Here's a PySpark snippet that works in a Spark environment:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local") \
  .appName("parquet_example") \
  .getOrCreate()

df = spark.read.csv('data/us_presidents.csv', header = True)
df.repartition(1).write.mode('overwrite').parquet('tmp/pyspark_us_presidents')

You can also use Koalas in a Spark environment:

import databricks.koalas as ks

df = ks.read_csv('data/us_presidents.csv')
df.to_parquet('tmp/koala_us_presidents')

Upvotes: 12

Shuli Hakim
Shuli Hakim

Reputation: 41

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys

sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("col3", StringType(), True),
    StructField("col4", StringType(), True),
    StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')

Upvotes: 1

taras
taras

Reputation: 6914

You can convert csv to parquet using pyarrow only - without pandas. It might be useful when you need to minimize your code dependencies (ex. with AWS Lambda).

import pyarrow.csv as pv
import pyarrow.parquet as pq

table = pv.read_csv(filename)
pq.write_table(table, filename.replace('csv', 'parquet'))

Refer to the pyarrow docs to fine-tune read_csv and write_table functions.

Upvotes: 39

ishwar
ishwar

Reputation: 298

You can write as a PARQUET FILE using spark:

spark = SparkSession.builder.appName("Test_Parquet").master("local[*]").getOrCreate()

parquetDF = spark.read.csv("data.csv")

parquetDF.coalesce(1).write.mode("overwrite").parquet("Parquet")

I hope this helps

Upvotes: 1

Amol More
Amol More

Reputation: 151

import boto3
import pandas as pd
import pyarrow as pa
from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = boto3.client('s3',region_name='us-east-2')
obj = s3.get_object(Bucket='ssiworkoutput', Key='file_Folder/File_Name.csv')
df = pd.read_csv(obj['Body'])

table = pa.Table.from_pandas(df)

output_file = "s3://ssiworkoutput/file/output.parquet"  # S3 Path need to mention
s3 = S3FileSystem()

pq.write_to_dataset(table=table,
                    root_path=output_file,partition_cols=['Year','Month'],
                    filesystem=s3)

print("File converted from CSV to parquet completed")

Upvotes: 15

Uwe L. Korn
Uwe L. Korn

Reputation: 8796

Using the packages pyarrow and pandas you can convert CSVs to Parquet without using a JVM in the background:

import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')

One limitation in which you will run is that pyarrow is only available for Python 3.5+ on Windows. Either use Linux/OSX to run the code as Python 2 or upgrade your windows setup to Python 3.6.

Upvotes: 73

Related Questions