Reputation: 327
We have a very large Pyspark Dataframe, on which we need to perform a groupBy operation.
We've tried with
df_gp=df.groupBy('some_column').count()
and it's taking a really long time (it's been running for more than 17hrs with no result).
I also tried with
df_gp=df.groupBy('some_column').agg(count)
but as far as I can tell the behaviour is the same.
For more context :
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
req=""" SELECT *
FROM table
where isodate='2020-07-27'
"""
df = hive.executeQuery(req)
count()
or cache()
work in under a minuteI've been reading about Spark's groupBy
on different sources, but from what I gathered here, Dataframe API doesn't need to load or shuffle keys in memory, so it should not be a problem even on large Dataframes.
I get that a groupBy
on such a large volume of data can take some time, but this is really too much. I guess there are some memory parameters that maybe need tuning, or is there maybe something wrong with the way we're executing the groupBy operation?
[EDIT] I forgot to mention there are some UDFs being processed on the Dataframe before groupBy
. I've tried :
groupBy
on a large Dataframe, without UDFs : gives result in less than a minutegroupBy
on a sample of the processed Dataframe : same problem as beforeSo we're thinking the UDFs are the actual cause of the problem, not groupBy
Upvotes: 0
Views: 4470
Reputation: 451
.groupBy('some_column').count()
and .groupBy('some_column').count()
are same
groupBy
causes shuffle, what that post meant was that it only shuffles necessary column data only (no extra columns which are not used in groupBy or agg function)
I've been reading about Spark's groupBy on different sources, but from what I gathered here, Dataframe API doesn't need to load or shuffle keys in memory, so it should not be a problem even on large Dataframes.
groupBy
could take time if more data is shuffled and spark.sql.shuffle.partitions
is set low (200 default). In such case 1 core will have a big chunk of shuffled data to aggregategroupBy
has data skew as it will cause a lot of data to go to a single executor corespark.sql.shuffle.partitions
to a higher value (in my experience should be around <amount_of_data_shuffled_in_gb>/100MB
to ensure 1 core gets around 100 MB data to aggregateUpvotes: 1
Reputation: 9
It's possible that it's running slow because of the underlying Hive query and not because of the groupBy
operation. As you probably know, spark does lazy evaluation, so the latency could be coming from either of the above.
One way to test it, is to cache()
the dataframe or calling a simple count()
before executing groupBy on it. If you see the same issue, it's because of the hive query execution and the solution will look different there. You could also try to read the data from a file and see if you notice the same execution times when performing groupBy.
Upvotes: 0