Reputation: 449
I have a relatively big amount of data. 5000 orc files, around 300 mb each. And 4 dictionaries(a few kb each).
After loading it in dataframes(around 1-2 hours) and joining, I try to group and aggregate it.
tableDS
.join(broadcast(enq1), $"url".contains($"q1"), "left_outer")
.join(broadcast(enq2), $"url".contains($"q2"), "left_outer")
.join(broadcast(enq3), $"url".contains($"q3"), "left_outer")
.join(broadcast(model), $"url".contains($"model"), "left_outer")
.groupBy($"url", $"ctn", $"timestamp")
.agg(
collect_set(when($"q2".isNotNull && $"q3".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model"),
collect_set(when($"q1".isNotNull && $"q2".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model2")
)
It takes 15-16 hours.
My questions are.
1)Does groupby for dataframes work the same as groupbykey for rdd'd(perform shaffle of all data) and if it does, would moving to dataset methods groupbykey.reducegroupes or even rdd reducebykey improve performance?
2)Or is the problem in the resources? There are 200 tasks perfoming the grouping, would increasing the number of these tasks help? How would I even do it?
This is how I run It
spark-submit \
--class Main \
--master yarn \
--deploy-mode client \
--num-executors 200 \
--executor-cores 20 \
--driver-memory 8G \
--executor-memory 16G \
--files hive-site.xml#hive-site.xml \
--conf spark.task.maxFailures=10 \
--conf spark.executor.memory=16G \
--conf spark.app.name=spark-job \
--conf spark.yarn.executor.memoryOverhead=4096 \
--conf spark.yarn.driver.memoryOverhead=2048 \
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.consolidateFiles=true \
--conf spark.broadcast.compress=true \
--conf spark.shuffle.compress=true \
--conf spark.shuffle.spill.compress=true \
Upvotes: 0
Views: 1452
Reputation: 26
Does groupby for dataframes work the same as groupbykey for rdd'
With collect_set
it is pretty much the same as groupByKey
. It might behave better if number of duplicates is large, otherwise you can expect a similar performance profile to RDD.groupByKey
, not accounting for shuffle implementation differences.
would moving to dataset methods groupbykey.reducegroupes
No, it wouldn't. reduceByGroups
doesn't allow mutable buffers, so even in the best possible case you won't have much room for improvement.
or even rdd reducebykey improve performance?
Probably not. You won't be able to improve groupByKey
with reduceByKey
(if you believe this)
Or is the problem in the resources? There are 200 tasks perfoming the grouping, would increasing the number of these tasks help? How would I even do it?
It might, but we can only guess. Tune spark.sql.shuffle.partitions
and measure by yourself.
Upvotes: 1