Reputation: 17872
I have a DataFrame with two categorical columns, similar to the following example:
+----+-------+-------+
| ID | Cat A | Cat B |
+----+-------+-------+
| 1 | A | B |
| 2 | B | C |
| 5 | A | B |
| 7 | B | C |
| 8 | A | C |
+----+-------+-------+
I have some processing to do that needs two steps: The first one needs the data to be grouped by both categorical columns. In the example, it would generate the following DataFrame:
+-------+-------+-----+
| Cat A | Cat B | Cnt |
+-------+-------+-----+
| A | B | 2 |
| B | C | 2 |
| A | C | 1 |
+-------+-------+-----+
Then, the next step consists on grouping only by CatA, to calculate a new aggregation, for example:
+-----+-----+
| Cat | Cnt |
+-----+-----+
| A | 3 |
| B | 2 |
+-----+-----+
Now come the questions:
In my solution, I create the intermediate dataframe by doing
val df2 = df.groupBy("catA", "catB").agg(...)
and then I aggregate this df2
to get the last one:
val df3 = df2.groupBy("catA").agg(...)
I assume it is more efficient than aggregating the first DF again. Is it a good assumption? Or it makes no difference?
Are there any suggestions of a more efficient way to achieve the same results?
Upvotes: 0
Views: 450
Reputation: 330453
Generally speaking it looks like a good approach and should be more efficient than aggregating data twice. Since shuffle files are implicitly cached at least part of the work should be performed only once. So when you call an action on df2
and subsequently on df3
you should see that stages corresponding to df2
have been skipped. Also partial structure enforced by the first shuffle may reduce memory requirements for the aggregation buffer during the second agg
.
Unfortunately DataFrame
aggregations, unlike RDD aggregations, cannot use custom partitioner. It means that you cannot compute both data frames using a single shuffle based on a value of catA
. It means that second aggregation will require separate exchange hash partitioning. I doubt it justifies switching to RDDs
.
Upvotes: 3