Daniel de Paula
Daniel de Paula

Reputation: 17872

Performance: Group by a subset of previous grouping columns

I have a DataFrame with two categorical columns, similar to the following example:

+----+-------+-------+
| ID | Cat A | Cat B |
+----+-------+-------+
|  1 |   A   |   B   |
|  2 |   B   |   C   |
|  5 |   A   |   B   |
|  7 |   B   |   C   |
|  8 |   A   |   C   |
+----+-------+-------+

I have some processing to do that needs two steps: The first one needs the data to be grouped by both categorical columns. In the example, it would generate the following DataFrame:

+-------+-------+-----+
| Cat A | Cat B | Cnt |
+-------+-------+-----+
|   A   |   B   |  2  |
|   B   |   C   |  2  |
|   A   |   C   |  1  |
+-------+-------+-----+

Then, the next step consists on grouping only by CatA, to calculate a new aggregation, for example:

+-----+-----+
| Cat | Cnt |
+-----+-----+
|  A  |  3  |
|  B  |  2  |
+-----+-----+

Now come the questions:

  1. In my solution, I create the intermediate dataframe by doing

    val df2 = df.groupBy("catA", "catB").agg(...)
    

    and then I aggregate this df2 to get the last one:

    val df3 = df2.groupBy("catA").agg(...)
    

    I assume it is more efficient than aggregating the first DF again. Is it a good assumption? Or it makes no difference?

  2. Are there any suggestions of a more efficient way to achieve the same results?

Upvotes: 0

Views: 450

Answers (1)

zero323
zero323

Reputation: 330453

Generally speaking it looks like a good approach and should be more efficient than aggregating data twice. Since shuffle files are implicitly cached at least part of the work should be performed only once. So when you call an action on df2 and subsequently on df3 you should see that stages corresponding to df2 have been skipped. Also partial structure enforced by the first shuffle may reduce memory requirements for the aggregation buffer during the second agg.

Unfortunately DataFrame aggregations, unlike RDD aggregations, cannot use custom partitioner. It means that you cannot compute both data frames using a single shuffle based on a value of catA. It means that second aggregation will require separate exchange hash partitioning. I doubt it justifies switching to RDDs.

Upvotes: 3

Related Questions