Reputation: 31553
So I have just 1 parquet file I'm reading with Spark (using the SQL stuff) and I'd like it to be processed with 100 partitions. I've tried setting spark.default.parallelism
to 100, we have also tried changing the compression of the parquet to none (from gzip). No matter what we do the first stage of the spark job only has a single partition (once a shuffle occurs it gets repartitioned into 100 and thereafter obviously things are much much faster).
Now according to a few sources (like below) parquet should be splittable (even if using gzip!), so I'm super confused and would love some advice.
I'm using spark 1.0.0, and apparently the default value for spark.sql.shuffle.partitions
is 200, so it can't be that. In fact all the defaults for parallelism are much more than 1, so I don't understand what's going on.
Upvotes: 19
Views: 33458
Reputation: 912
You can just:
pip install chdb
python3 -m chdb \
"SELECT * FROM \`hits.parquet\` \
LIMIT 10000000 \
INTO OUTFILE 'hits10m.parquet'"
Upvotes: 0
Reputation: 1367
The new way of doing it (Spark 2.x) is setting
spark.sql.files.maxPartitionBytes
Source: https://issues.apache.org/jira/browse/SPARK-17998 (the official documentation is not correct yet, misses the .sql)
From my experience, Hadoop settings no longer have effect.
Upvotes: 2
Reputation: 8036
You should write your parquet files with a smaller block size. Default is 128Mb per block, but it's configurable by setting parquet.block.size
configuration in the writer.
The source of ParquetOuputFormat is here, if you want to dig into details.
The block size is minimum amount of data you can read out of a parquet file which is logically readable (since parquet is columnar, you can't just split by line or something trivial like this), so you can't have more reading threads than input blocks.
Upvotes: 13
Reputation: 1
To achieve that you should use SparkContext
to set Hadoop configuration (sc.hadoopConfiguration
) property mapreduce.input.fileinputformat.split.maxsize
.
By setting this property to a lower value than hdfs.blockSize, than you will get as much partitions as the number of splits.
For example:
When hdfs.blockSize
= 134217728 (128MB),
and one file is read which contains exactly one full block,
and mapreduce.input.fileinputformat.split.maxsize
= 67108864 (64MB)
Then there will be two partitions those splits will be read into.
Upvotes: 0
Reputation: 11
You have mentioned that you want to control distribution during write to parquet. When you create parquet from RDDs parquet preserves partitions of the RDD. So, if you create RDD and specify 100 partitions and from dataframe with parquet format then it will be writing 100 separate parquet files to fs.
For read you could specify spark.sql.shuffle.partitions
parameter.
Upvotes: 1
Reputation: 5202
Maybe your parquet file only takes one HDFS block. Create a big parquet file that has many HDFS blocks and load it
val k = sc.parquetFile("the-big-table.parquet")
k.partitions.length
You'll see same number of partitions as HDFS blocks. This worked fine for me (spark-1.1.0)
Upvotes: 1