Evgenii
Evgenii

Reputation: 449

groupby dataframe takes too much time

I have a relatively big amount of data. 5000 orc files, around 300 mb each. And 4 dictionaries(a few kb each).

After loading it in dataframes(around 1-2 hours) and joining, I try to group and aggregate it.

tableDS
  .join(broadcast(enq1), $"url".contains($"q1"), "left_outer")
  .join(broadcast(enq2), $"url".contains($"q2"), "left_outer")
  .join(broadcast(enq3), $"url".contains($"q3"), "left_outer")
  .join(broadcast(model), $"url".contains($"model"), "left_outer")
  .groupBy($"url", $"ctn", $"timestamp")
  .agg(
collect_set(when($"q2".isNotNull && $"q3".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model"),
collect_set(when($"q1".isNotNull && $"q2".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model2")
  )

It takes 15-16 hours.

My questions are.

1)Does groupby for dataframes work the same as groupbykey for rdd'd(perform shaffle of all data) and if it does, would moving to dataset methods groupbykey.reducegroupes or even rdd reducebykey improve performance?

2)Or is the problem in the resources? There are 200 tasks perfoming the grouping, would increasing the number of these tasks help? How would I even do it?

This is how I run It

spark-submit \
--class Main \
--master yarn \
--deploy-mode client \
--num-executors 200 \
--executor-cores 20 \
--driver-memory 8G \
--executor-memory 16G \
--files hive-site.xml#hive-site.xml \
--conf spark.task.maxFailures=10 \
--conf spark.executor.memory=16G \
--conf spark.app.name=spark-job \
--conf spark.yarn.executor.memoryOverhead=4096 \
--conf spark.yarn.driver.memoryOverhead=2048 \
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.consolidateFiles=true \
--conf spark.broadcast.compress=true \
--conf spark.shuffle.compress=true \
--conf spark.shuffle.spill.compress=true \

Upvotes: 0

Views: 1452

Answers (1)

user9961256
user9961256

Reputation: 26

Does groupby for dataframes work the same as groupbykey for rdd'

With collect_set it is pretty much the same as groupByKey. It might behave better if number of duplicates is large, otherwise you can expect a similar performance profile to RDD.groupByKey, not accounting for shuffle implementation differences.

would moving to dataset methods groupbykey.reducegroupes

No, it wouldn't. reduceByGroups doesn't allow mutable buffers, so even in the best possible case you won't have much room for improvement.

or even rdd reducebykey improve performance?

Probably not. You won't be able to improve groupByKey with reduceByKey (if you believe this)

Or is the problem in the resources? There are 200 tasks perfoming the grouping, would increasing the number of these tasks help? How would I even do it?

It might, but we can only guess. Tune spark.sql.shuffle.partitions and measure by yourself.

Upvotes: 1

Related Questions