Reputation: 11090
I have the following statement that is taking hours to execute on a large dataframe (billions of records). I read that groupby is expensive and needs to be avoided .Our spark version is spark-2.0.1.
df.groupby("_c1","_c2","_c3","_c4","_c5").count()
window = Window.partitionBy(df['_c1']).orderBy(df['count'].desc())
df.select('*', rank().over(window).alias('rank')) \
.filter(col('rank') = 1)
Is there an alternative/better way to group by multiple columns,count and get the row with the highest count for each group?
Upvotes: 0
Views: 2649
Reputation: 11
df.transform(with_qtd_repeat(bulk))
def with_qtd_repeat(bulk: DataFrame)(monitor: DataFrame): DataFrame = {
val bulk_hash = bulk.withColumn("hash",
sha2(concat_ws(",", bulk.columns.map(col): _*),256))
val qtd_repeat= bulk_hash.groupBy(col("hash"))
.count().filter(col("count") >= 2).count
monitor.withColumn("qtd_repeat", lit(qtd_repeat))
}
Upvotes: 1
Reputation: 11090
Repartitioning the dataframe on column "_c1"
before calling the groupby
brought marked improvement in performance.Source
df.withColumn("ids", \
F.concat_ws("|",F.col("_c2"),F.col("_c3"),F.col("_c4"),F.col("_c5"))) \
.select("_c1","ids")
df.repartition(F.col("_c1")).groupby("_c1","ids").count()
window = Window.partitionBy(df['_c1']).orderBy(df['count'].desc())
df.select('*', rank().over(window).alias('rank')) \
.filter(col('rank') = 1)
Upvotes: 1