Mike
Mike

Reputation: 60848

How to improve performance of csv to parquet file format using pyspark?

I have a large dataset I need to convert from csv to parquet format using pyspark. There is approximately 500GB of data scattered across thousands of csv files. My initial implementation is simplistic ...

spark = SparkSession.builder \
    .master("local") \
    .appName("test") \
    .getOrCreate()

df = spark.read.csv(input_files, header=True, inferSchema=True)

df.repartition(1).write.mode('overwrite').parquet(output_dir)

The performance is abysmal, I have let it run for 2+ hours before giving up. From logging output I infer it does not even complete reading the csv files into the dataframe.

I am running spark locally on a server with 128 high performance CPU cores and 1TB of memory. Disk storage is SSD based with confirmed read speeds of 650 MB/s. My intuition is that I should be able to significantly improve performance given the computing resources available. I'm looking for tips on how to do this.

I have tried...

I'm stuck using pyspark for now per management direction, but if necessary I can convince them to use a different tool.

Upvotes: 1

Views: 3058

Answers (2)

blackbishop
blackbishop

Reputation: 32720

Some possible improvments :

  • Don't use .repartition(1) as you lose parallelism for writing operation
  • Persisit/cache the dataframe before writing : df.persist()

If you really need to save it as 1 parquet file, you can first write into temp folder without reducing partitions then use coalesce in a second write operation :

df = spark.read.csv(input_files, header=True, inferSchema=True).persist()
# ....

df.write.mode('overwrite').parquet("/temp/folder")
df.unpersist()

df1 = spark.read.parquet("/temp/folder")
df1.coalesce(1).write.mode('overwrite').parquet(output_dir)

Upvotes: 1

Nikunj Kakadiya
Nikunj Kakadiya

Reputation: 3008

Some suggestions based on my experience working with spark :

  • You should not infer the schema if you are dealing with huge data. It might not show significant improvement in the performance but definitely it would still save you some time.
  • Don't use repartition(1) as it would shuffle the data and create a single partition with data and that is what you don't want with huge volume of data that you have. I would suggest you to increase the number of partitions if possible based on the cluster configuration you have in order to get the parquet files saved faster.
  • Don't Cache/persist your data frame if you are just reading the csv files and then in the next step saving it as parquet files. It can increase your saving time as caching itself takes some time. Caching the data frame would have helped if you were performing multiple transformations on the data frame and then performing multiple actions on it. your are performing only one action of writing the data frame as parquet file, so according to me you should not cache the data frame.

Upvotes: 3

Related Questions