Aviral Kumar
Aviral Kumar

Reputation: 824

Converting Dataframe to RDD reduces partitions

In our code, Dataframe was created as :

DataFrame DF = hiveContext.sql("select * from table_instance");

When I convert my dataframe to rdd and try to get its number of partitions as

RDD<Row> newRDD = Df.rdd();
System.out.println(newRDD.getNumPartitions());

It reduces the number of partitions to 1(1 is printed in the console). Originally my dataframe has 102 partitions .

UPDATE:

While reading , I repartitoned the dataframe :

DataFrame DF = hiveContext.sql("select * from table_instance").repartition(200);

and then converted to rdd , so it gave me 200 partitions only. Does

JavaSparkContext

has a role to play in this? When we convert a dataframe to rdd , is default minimum partitions flag also considered at the spark context level?

UPDATE:

I made a seperate sample program in which I read the exact same table into dataframe and converted to rdd. No extra stage was created for RDD conversion and the partition count was also correct. I am now wondering what different am I doing in my main program.

Please let me know if my understanding is wrong here.

Upvotes: 3

Views: 1984

Answers (1)

Darshan
Darshan

Reputation: 2333

It basically depends on the implementation of hiveContext.sql(). Since I am new to Hive, my guess is hiveContext.sql doesn't know OR is not able to split the data present in the table.

For example, when you read a text file from HDFS, spark context considers the number of blocks used by that file to determine the partitions.

What you did with repartition is the obvious solution for these kinds of problems.(Note: repartition may cause a shuffle operation if proper partitioner is not used, hash Partitioner is used by default)

Coming to your doubt, hiveContext may consider the default minimum partition property. But, relying on default property is not going to solve all your problems. For instance, if your hive table's size increases, your program still uses the default number of partitions.

Update: Avoid shuffle during repartition

Define your custom partitioner:

public class MyPartitioner extends HashPartitioner {
    private final int partitions;
    public MyPartitioner(int partitions) {
        super();
        this.partitions = partitions;
    }
    @Override
    public int numPartitions() {
        return this.partitions;
    }

    @Override
    public int getPartition(Object key) {
        if (key instanceof String) {
            return super.getPartition(key);
        } else if (key instanceof Integer) {
            return (Integer.valueOf(key.toString()) % this.partitions);
        } else if (key instanceof Long) {
            return (int)(Long.valueOf(key.toString()) % this.partitions);
        }
        //TOD ... add more types
    }
}

Use your custom partitioner:

JavaPairRDD<Long, SparkDatoinDoc> pairRdd = hiveContext.sql("select *   from table_instance")
.mapToPair( //TODO ... expose the column as key)

rdd = rdd.partitionBy(new MyPartitioner(200));
//... rest of processing

Upvotes: 1

Related Questions