Reputation: 409
I would like to aggregate this DataFrame
and count the number of observations with a value less than or equal to the "BUCKET" field for each level. For example:
val myDF = Seq(
("foo", 0),
("foo", 0),
("bar", 0),
("foo", 1),
("foo", 1),
("bar", 1),
("foo", 2),
("bar", 2),
("foo", 3),
("bar", 3)).toDF("COL1", "BUCKET")
myDF.show
+----+------+
|COL1|BUCKET|
+----+------+
| foo| 0|
| foo| 0|
| bar| 0|
| foo| 1|
| foo| 1|
| bar| 1|
| foo| 2|
| bar| 2|
| foo| 3|
| bar| 3|
+----+------+
I can count the number of observations matching each bucket value using this code:
myDF.groupBy("COL1").pivot("BUCKET").count.show
+----+---+---+---+---+
|COL1| 0| 1| 2| 3|
+----+---+---+---+---+
| bar| 1| 1| 1| 1|
| foo| 2| 2| 1| 1|
+----+---+---+---+---+
But I want to count the number of rows with a value in the "BUCKET" field which is less than or equal to the final header after pivoting, like this:
+----+---+---+---+---+
|COL1| 0| 1| 2| 3|
+----+---+---+---+---+
| bar| 1| 2| 3| 4|
| foo| 2| 4| 5| 6|
+----+---+---+---+---+
Upvotes: 2
Views: 1047
Reputation: 22449
Here's one approach to get the rolling counts by traversing the pivoted BUCKET value columns using foldLeft
to aggregate the counts. Note that a tuple of (DataFrame, Int) is used for foldLeft
to transform the DataFrame as well as store the count in the previous iteration:
val pivotedDF = myDF.groupBy($"COL1").pivot("BUCKET").count
val buckets = pivotedDF.columns.filter(_ != "COL1")
buckets.drop(1).foldLeft((pivotedDF, buckets.head))( (acc, c) =>
( acc._1.withColumn(c, col(acc._2) + col(c)), c )
)._1.show
// +----+---+---+---+---+
// |COL1| 0| 1| 2| 3|
// +----+---+---+---+---+
// | bar| 1| 2| 3| 4|
// | foo| 2| 4| 5| 6|
// +----+---+---+---+---+
Upvotes: 3
Reputation: 12804
You can achieve this using a window function, as follows:
import org.apache.spark.sql.expressions.Window.partitionBy
import org.apache.spark.sql.functions.first
myDF.
select(
$"COL1",
$"BUCKET",
count($"BUCKET").over(partitionBy($"COL1").orderBy($"BUCKET")).as("ROLLING_COUNT")).
groupBy($"COL1").pivot("BUCKET").agg(first("ROLLING_COUNT")).
show()
+----+---+---+---+---+
|COL1| 0| 1| 2| 3|
+----+---+---+---+---+
| bar| 1| 2| 3| 4|
| foo| 2| 4| 5| 6|
+----+---+---+---+---+
What you are specifying here is that you want to perform a count of your observations, partitioned in windows as determined by a key (COL1
in this case). By specifying an ordering, you are also making the count rolling over the window, thus obtaining the results you want then to be pivoted in your end results.
This is the result of applying the window function:
myDF.
select(
$"COL1",
$"BUCKET",
count($"BUCKET").over(partitionBy($"COL1").orderBy($"BUCKET")).as("ROLLING_COUNT")).
show()
+----+------+-------------+
|COL1|BUCKET|ROLLING_COUNT|
+----+------+-------------+
| bar| 0| 1|
| bar| 1| 2|
| bar| 2| 3|
| bar| 3| 4|
| foo| 0| 2|
| foo| 0| 2|
| foo| 1| 4|
| foo| 1| 4|
| foo| 2| 5|
| foo| 3| 6|
+----+------+-------------+
Finally, by grouping by COL1
, pivoting over BUCKET
and only getting the first result of the rolling count (anyone would be good as all of them are applied to the whole window), you finally obtain the result you were looking for.
In a way, window functions are very similar to aggregations over groupings, but are more flexible and powerful. This just scratches the surface of window functions and you can dig a little bit deeper by having a look at this introductory reading.
Upvotes: 3