Reputation: 1023
I am using Spark 2.1.0
When i am trying to use Window function on a Dataframe
val winspec = Window.partitionBy("partition_column")
DF.withColumn("column", avg(DF("col_name")).over(winspec))
My Plan changes and add the below lines to the Physical Plan and due to this An Extra Stage , EXtra Shuffling is happening and the Data is Huge which Slows down my Query like anything & runs for Hours.
+- Window [avg(cast(someColumn#262 as double)) windowspecdefinition(partition_column#460, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS someColumn#263], [partition_column#460]
+- *Sort [partition_column#460 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(partition_column#460, 200)
Also i see the Stage as MapInternalPartition which i think is partitioned internally Now i don't know what is this. But because i think because of this even my 100 tasks took 30+ mins and in that 99 was completed within 1-2 mins and the last 1 task took remaining 30 mins leaving my cluster IDLE with no parallel processing which makes me think that is the data partitioned properly when Window function is used ???
I Tried to apply HashPartitioning by converting it to RDD... BECAUSE we cannot apply Custom / HashPartitioner on a Dataframe
So if i do this :
val myVal = DF.rdd.partitioner(new HashPartitioner(10000))
I am getting a return type of ANY with which i am not getting any Action list to perform.
I checked and saw that the column with which the Partitioning is happening in Window functions contains all NULL values
Upvotes: 0
Views: 168
Reputation: 35229
TL;DR:
Datasets cannot reuse RDD partitioning. Partitioning with Dataset
should be done with repartition
method:
df.repartition($"col_name")
But it won't help you because of 2)
And this:
val myVal = DF.rdd.partitioner(new HashPartitioner(10000))
wouldn't return Any
. It wouldn't compile, as there is no partitioner
method for RDD
, which takes arguments.
Correct method is partitionBy
but it is not applicable to RDD[Row]
and it wouldn't help you because of 3).
If there is enough memory you can try
df.join(
broadcast(df.groupBy("partition_column").agg(avg(DF("col_name"))),
Seq("partition_column")
)
Edit:
If you're trying to compute running average (avg
with Window.partitionBy("partition_column")
computes global average by group, not running average), then you're out of luck.
If partitioning column has only NULLS
, then task is not distributed and fully sequential.
To compute global running average you can try to apply logic similar to this How to compute cumulative sum using Spark.
Upvotes: 4