Reputation: 1965
I have a requirement to load data from an Hive table using Spark SQL HiveContext
and load into HDFS. By default, the DataFrame
from SQL output is having 2 partitions. To get more parallelism i need more partitions out of the SQL. There is no overloaded method in HiveContex
t to take number of partitions parameter.
Repartitioning of the RDD causes shuffling and results in more processing time.
>
val result = sqlContext.sql("select * from bt_st_ent")
Has the log output of:
Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)
I would like to know is there any way to increase the partitions size of the SQL output.
Upvotes: 27
Views: 28484
Reputation: 330433
Spark < 2.0:
You can use Hadoop configuration options:
mapred.min.split.size
.mapred.max.split.size
as well as HDFS block size to control partition size for filesystem based formats*.
val minSplit: Int = ???
val maxSplit: Int = ???
sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit)
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)
Spark 2.0+:
You can use spark.sql.files.maxPartitionBytes
configuration:
spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)
In both cases these values may not be in use by a specific data source API so you should always check documentation / implementation details of the format you use.
* Other input formats can use different settings. See for example
Furthermore Datasets
created from RDDs
will inherit partition layout from their parents.
Similarly bucketed tables will use bucket layout defined in the metastore with 1:1 relationship between bucket and Dataset
partition.
Upvotes: 21
Reputation: 1630
If your SQL performs a shuffle (for example it has a join, or some sort of group by), you can set the number of partitions by setting the 'spark.sql.shuffle.partitions' property
sqlContext.setConf( "spark.sql.shuffle.partitions", 64)
Following up on what Fokko suggests, you could use a random variable to cluster by.
val result = sqlContext.sql("""
select * from (
select *,random(64) as rand_part from bt_st_ent
) cluster by rand_part""")
Upvotes: 3
Reputation: 2250
A very common and painful problem. You should look for a key which distributes the data in uniform partitions. The you can use the DISTRIBUTE BY
and CLUSTER BY
operators to tell spark to group rows in a partition. This will incur some overhead on the query itself. But will result in evenly sized partitions. Deepsense has a very good tutorial on this.
Upvotes: 5