nobody
nobody

Reputation: 11090

pyspark - groupby multiple columns/count performance

I have the following statement that is taking hours to execute on a large dataframe (billions of records). I read that groupby is expensive and needs to be avoided .Our spark version is spark-2.0.1.

df.groupby("_c1","_c2","_c3","_c4","_c5").count()
window = Window.partitionBy(df['_c1']).orderBy(df['count'].desc())
df.select('*', rank().over(window).alias('rank')) \
   .filter(col('rank') = 1)

Is there an alternative/better way to group by multiple columns,count and get the row with the highest count for each group?

Upvotes: 0

Views: 2649

Answers (2)

Luiz
Luiz

Reputation: 11

df.transform(with_qtd_repeat(bulk))

def with_qtd_repeat(bulk: DataFrame)(monitor: DataFrame): DataFrame = {
    val bulk_hash = bulk.withColumn("hash",
        sha2(concat_ws(",", bulk.columns.map(col): _*),256))
    val qtd_repeat= bulk_hash.groupBy(col("hash"))
        .count().filter(col("count") >= 2).count
    monitor.withColumn("qtd_repeat", lit(qtd_repeat))
}

Upvotes: 1

nobody
nobody

Reputation: 11090

Repartitioning the dataframe on column "_c1" before calling the groupby brought marked improvement in performance.Source

df.withColumn("ids", \
                F.concat_ws("|",F.col("_c2"),F.col("_c3"),F.col("_c4"),F.col("_c5"))) \
                .select("_c1","ids")

df.repartition(F.col("_c1")).groupby("_c1","ids").count()

window = Window.partitionBy(df['_c1']).orderBy(df['count'].desc())
df.select('*', rank().over(window).alias('rank')) \
   .filter(col('rank') = 1)

Upvotes: 1

Related Questions