Reputation: 41
I'm trying to use Spark to convert a bunch of csv files to parquet, with the interesting case that the input csv files are already "partitioned" by directory. All the input files have the same set of columns. The input files structure looks like :
/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv
I'd like to read those files with Spark and write their data to a parquet table in hdfs, preserving the partitioning (partitioned by input directory), and such as there is a single output file per partition. The output files strucutre should look like :
hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet
The best solution I have found so far is to loop among the input directories, loading the csv files in a dataframe and to write the dataframe in the target partition in the parquet table. But this not efficient since I want a single output file per partition, the writing to hdfs is a single tasks that blocks the loop. I wonder how to achieve this with a maximum of parallelism (and without shuffling the data in the cluster).
Thanks !
Upvotes: 4
Views: 14009
Reputation: 41
Best solution I've found so far (no shuffling and as many threads as input dirs) :
Create an rdd of input dirs, with as many partitions as input dirs
Transform it to an rdd of input files (preserving the partitions by dirs)
Flat-map it with a custom csv parser
Convert rdd to dataframe
Write dataframe to parquet table partitioned by dirs
It requires to write his own parser. I could not find a solution to preserve the partitioning using sc.textfile or the databricks csv parser.
Upvotes: 0
Reputation: 13936
Rename your input directories changing dirX
to dir=dirX
. Then perform:
spark.read.csv('/path/').coalesce(1).write.partitionBy('dir').parquet('output')
If you cannot rename directories, you can use Hive Metastore. Create external table and one partition per every directory. Then load this table and rewrite using above pattern.
Upvotes: 1