Reputation: 60848
I have a large dataset I need to convert from csv to parquet format using pyspark. There is approximately 500GB of data scattered across thousands of csv files. My initial implementation is simplistic ...
spark = SparkSession.builder \
.master("local") \
.appName("test") \
.getOrCreate()
df = spark.read.csv(input_files, header=True, inferSchema=True)
df.repartition(1).write.mode('overwrite').parquet(output_dir)
The performance is abysmal, I have let it run for 2+ hours before giving up. From logging output I infer it does not even complete reading the csv files into the dataframe.
I am running spark locally on a server with 128 high performance CPU cores and 1TB of memory. Disk storage is SSD based with confirmed read speeds of 650 MB/s. My intuition is that I should be able to significantly improve performance given the computing resources available. I'm looking for tips on how to do this.
I have tried...
not inferring schema, this did not produce a noticeable difference in performance (The schema is four columns of text)
using the configuration setting spark.executor.cores
to match the number of physical cores on my server. The setting did not seem to have any effect, I did not observe the system using more cores.
I'm stuck using pyspark for now per management direction, but if necessary I can convince them to use a different tool.
Upvotes: 1
Views: 3058
Reputation: 32720
Some possible improvments :
.repartition(1)
as you lose parallelism for writing operationdf.persist()
If you really need to save it as 1 parquet file, you can first write into temp folder without reducing partitions then use coalesce
in a second write operation :
df = spark.read.csv(input_files, header=True, inferSchema=True).persist()
# ....
df.write.mode('overwrite').parquet("/temp/folder")
df.unpersist()
df1 = spark.read.parquet("/temp/folder")
df1.coalesce(1).write.mode('overwrite').parquet(output_dir)
Upvotes: 1
Reputation: 3008
Some suggestions based on my experience working with spark :
Upvotes: 3