Flxnt
Flxnt

Reputation: 327

Pyspark groupBy on large dataframe

We have a very large Pyspark Dataframe, on which we need to perform a groupBy operation.

We've tried with

df_gp=df.groupBy('some_column').count()

and it's taking a really long time (it's been running for more than 17hrs with no result).

I also tried with

df_gp=df.groupBy('some_column').agg(count)

but as far as I can tell the behaviour is the same.

For more context :

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

req=""" SELECT *
        FROM table
        where isodate='2020-07-27'
    """

df = hive.executeQuery(req)

I've been reading about Spark's groupBy on different sources, but from what I gathered here, Dataframe API doesn't need to load or shuffle keys in memory, so it should not be a problem even on large Dataframes.

I get that a groupBy on such a large volume of data can take some time, but this is really too much. I guess there are some memory parameters that maybe need tuning, or is there maybe something wrong with the way we're executing the groupBy operation?

[EDIT] I forgot to mention there are some UDFs being processed on the Dataframe before groupBy. I've tried :

So we're thinking the UDFs are the actual cause of the problem, not groupBy

Upvotes: 0

Views: 4470

Answers (2)

Samir Vyas
Samir Vyas

Reputation: 451

A few myth bursters first

  1. .groupBy('some_column').count() and .groupBy('some_column').count() are same

  2. groupBy causes shuffle, what that post meant was that it only shuffles necessary column data only (no extra columns which are not used in groupBy or agg function)

    I've been reading about Spark's groupBy on different sources, but from what I gathered here, Dataframe API doesn't need to load or shuffle keys in memory, so it should not be a problem even on large Dataframes.

Now to your problem

  1. groupBy could take time if more data is shuffled and spark.sql.shuffle.partitions is set low (200 default). In such case 1 core will have a big chunk of shuffled data to aggregate
  2. it could also take a lot of time if column used in groupBy has data skew as it will cause a lot of data to go to a single executor core

Solution

  1. increase spark.sql.shuffle.partitions to a higher value (in my experience should be around <amount_of_data_shuffled_in_gb>/100MB to ensure 1 core gets around 100 MB data to aggregate
  2. skew can be solved with introducing randomness in data (salting) https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da

Upvotes: 1

Reva
Reva

Reputation: 9

It's possible that it's running slow because of the underlying Hive query and not because of the groupBy operation. As you probably know, spark does lazy evaluation, so the latency could be coming from either of the above. One way to test it, is to cache() the dataframe or calling a simple count() before executing groupBy on it. If you see the same issue, it's because of the hive query execution and the solution will look different there. You could also try to read the data from a file and see if you notice the same execution times when performing groupBy.

Upvotes: 0

Related Questions