AJm
AJm

Reputation: 1023

Unable to Convert to DataFrame from RDD after applying partitioning

I am using Spark 2.1.0

When i am trying to use Window function on a Dataframe

val winspec = Window.partitionBy("partition_column")
DF.withColumn("column", avg(DF("col_name")).over(winspec))

My Plan changes and add the below lines to the Physical Plan and due to this An Extra Stage , EXtra Shuffling is happening and the Data is Huge which Slows down my Query like anything & runs for Hours.

+- Window [avg(cast(someColumn#262 as double)) windowspecdefinition(partition_column#460, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS someColumn#263], [partition_column#460]
   +- *Sort [partition_column#460 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(partition_column#460, 200)

Also i see the Stage as MapInternalPartition which i think is partitioned internally Now i don't know what is this. But because i think because of this even my 100 tasks took 30+ mins and in that 99 was completed within 1-2 mins and the last 1 task took remaining 30 mins leaving my cluster IDLE with no parallel processing which makes me think that is the data partitioned properly when Window function is used ???

I Tried to apply HashPartitioning by converting it to RDD... BECAUSE we cannot apply Custom / HashPartitioner on a Dataframe

So if i do this :

val myVal = DF.rdd.partitioner(new HashPartitioner(10000))

I am getting a return type of ANY with which i am not getting any Action list to perform.

I checked and saw that the column with which the Partitioning is happening in Window functions contains all NULL values

Upvotes: 0

Views: 168

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

TL;DR:

  1. A shuffle when using window functions is not extra. It is required for correctness and cannot be removed.
  2. Applying partitioner shuffles.
  3. Datasets cannot reuse RDD partitioning. Partitioning with Dataset should be done with repartition method:

    df.repartition($"col_name")
    
  4. But it won't help you because of 2)

  5. And this:

    val myVal = DF.rdd.partitioner(new HashPartitioner(10000))
    

    wouldn't return Any. It wouldn't compile, as there is no partitioner method for RDD, which takes arguments.

    Correct method is partitionBy but it is not applicable to RDD[Row] and it wouldn't help you because of 3).

If there is enough memory you can try

df.join(
  broadcast(df.groupBy("partition_column").agg(avg(DF("col_name"))),
  Seq("partition_column")
)

Edit:

If you're trying to compute running average (avg with Window.partitionBy("partition_column") computes global average by group, not running average), then you're out of luck.

If partitioning column has only NULLS, then task is not distributed and fully sequential.

To compute global running average you can try to apply logic similar to this How to compute cumulative sum using Spark.

Upvotes: 4

Related Questions