Reputation: 824
In our code, Dataframe was created as :
DataFrame DF = hiveContext.sql("select * from table_instance");
When I convert my dataframe to rdd and try to get its number of partitions as
RDD<Row> newRDD = Df.rdd();
System.out.println(newRDD.getNumPartitions());
It reduces the number of partitions to 1(1 is printed in the console). Originally my dataframe has 102 partitions .
UPDATE:
While reading , I repartitoned the dataframe :
DataFrame DF = hiveContext.sql("select * from table_instance").repartition(200);
and then converted to rdd , so it gave me 200 partitions only. Does
JavaSparkContext
has a role to play in this? When we convert a dataframe to rdd , is default minimum partitions flag also considered at the spark context level?
UPDATE:
I made a seperate sample program in which I read the exact same table into dataframe and converted to rdd. No extra stage was created for RDD conversion and the partition count was also correct. I am now wondering what different am I doing in my main program.
Please let me know if my understanding is wrong here.
Upvotes: 3
Views: 1984
Reputation: 2333
It basically depends on the implementation of hiveContext.sql()
. Since I am new to Hive, my guess is hiveContext.sql
doesn't know OR is not able to split the data present in the table.
For example, when you read a text file from HDFS, spark context considers the number of blocks used by that file to determine the partitions.
What you did with repartition
is the obvious solution for these kinds of problems.(Note: repartition may cause a shuffle operation if proper partitioner is not used, hash Partitioner is used by default)
Coming to your doubt, hiveContext may consider the default minimum partition property. But, relying on default property is not going to solve all your problems. For instance, if your hive table's size increases, your program still uses the default number of partitions.
Update: Avoid shuffle during repartition
Define your custom partitioner:
public class MyPartitioner extends HashPartitioner {
private final int partitions;
public MyPartitioner(int partitions) {
super();
this.partitions = partitions;
}
@Override
public int numPartitions() {
return this.partitions;
}
@Override
public int getPartition(Object key) {
if (key instanceof String) {
return super.getPartition(key);
} else if (key instanceof Integer) {
return (Integer.valueOf(key.toString()) % this.partitions);
} else if (key instanceof Long) {
return (int)(Long.valueOf(key.toString()) % this.partitions);
}
//TOD ... add more types
}
}
Use your custom partitioner:
JavaPairRDD<Long, SparkDatoinDoc> pairRdd = hiveContext.sql("select * from table_instance")
.mapToPair( //TODO ... expose the column as key)
rdd = rdd.partitionBy(new MyPartitioner(200));
//... rest of processing
Upvotes: 1