samthebest
samthebest

Reputation: 31553

How to split parquet files into many partitions in Spark?

So I have just 1 parquet file I'm reading with Spark (using the SQL stuff) and I'd like it to be processed with 100 partitions. I've tried setting spark.default.parallelism to 100, we have also tried changing the compression of the parquet to none (from gzip). No matter what we do the first stage of the spark job only has a single partition (once a shuffle occurs it gets repartitioned into 100 and thereafter obviously things are much much faster).

Now according to a few sources (like below) parquet should be splittable (even if using gzip!), so I'm super confused and would love some advice.

https://www.safaribooksonline.com/library/view/hadoop-application-architectures/9781491910313/ch01.html

I'm using spark 1.0.0, and apparently the default value for spark.sql.shuffle.partitions is 200, so it can't be that. In fact all the defaults for parallelism are much more than 1, so I don't understand what's going on.

Upvotes: 19

Views: 33458

Answers (6)

auxten
auxten

Reputation: 912

You can just:

pip install chdb
python3 -m chdb \
   "SELECT * FROM \`hits.parquet\` \
   LIMIT 10000000 \
   INTO OUTFILE 'hits10m.parquet'"

Upvotes: 0

F Pereira
F Pereira

Reputation: 1367

The new way of doing it (Spark 2.x) is setting

spark.sql.files.maxPartitionBytes

Source: https://issues.apache.org/jira/browse/SPARK-17998 (the official documentation is not correct yet, misses the .sql)

From my experience, Hadoop settings no longer have effect.

Upvotes: 2

C4stor
C4stor

Reputation: 8036

You should write your parquet files with a smaller block size. Default is 128Mb per block, but it's configurable by setting parquet.block.size configuration in the writer.

The source of ParquetOuputFormat is here, if you want to dig into details.

The block size is minimum amount of data you can read out of a parquet file which is logically readable (since parquet is columnar, you can't just split by line or something trivial like this), so you can't have more reading threads than input blocks.

Upvotes: 13

Prokod
Prokod

Reputation: 1

To achieve that you should use SparkContext to set Hadoop configuration (sc.hadoopConfiguration) property mapreduce.input.fileinputformat.split.maxsize.

By setting this property to a lower value than hdfs.blockSize, than you will get as much partitions as the number of splits.

For example:
When hdfs.blockSize = 134217728 (128MB),
and one file is read which contains exactly one full block,
and mapreduce.input.fileinputformat.split.maxsize = 67108864 (64MB)

Then there will be two partitions those splits will be read into.

Upvotes: 0

Ruslan Pelin
Ruslan Pelin

Reputation: 11

You have mentioned that you want to control distribution during write to parquet. When you create parquet from RDDs parquet preserves partitions of the RDD. So, if you create RDD and specify 100 partitions and from dataframe with parquet format then it will be writing 100 separate parquet files to fs. For read you could specify spark.sql.shuffle.partitions parameter.

Upvotes: 1

suztomo
suztomo

Reputation: 5202

Maybe your parquet file only takes one HDFS block. Create a big parquet file that has many HDFS blocks and load it

val k = sc.parquetFile("the-big-table.parquet")
k.partitions.length

You'll see same number of partitions as HDFS blocks. This worked fine for me (spark-1.1.0)

Upvotes: 1

Related Questions