Reputation: 17724
New to the RDD api of spark - thanks to Spark migrate sql window function to RDD for better performance - I managed to generate the following table:
+-----------------+---+
| _1| _2|
+-----------------+---+
| [col3TooMany,C]| 0|
| [col1,A]| 0|
| [col2,B]| 0|
| [col3TooMany,C]| 1|
| [col1,A]| 1|
| [col2,B]| 1|
|[col3TooMany,jkl]| 0|
| [col1,d]| 0|
| [col2,a]| 0|
| [col3TooMany,C]| 0|
| [col1,d]| 0|
| [col2,g]| 0|
| [col3TooMany,t]| 1|
| [col1,A]| 1|
| [col2,d]| 1|
| [col3TooMany,C]| 1|
| [col1,d]| 1|
| [col2,c]| 1|
| [col3TooMany,C]| 1|
| [col1,c]| 1|
+-----------------+---+
with an initial input of
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
import org.apache.spark.sql.functions._
val exploded = explode(array(
(columnsToDrop ++ columnsToCode).map(c =>
struct(lit(c).alias("k"), col(c).alias("v"))): _*
)).alias("level")
val long = df.select(exploded, $"TARGET")
import org.apache.spark.util.StatCounter
then
long.as[((String, String), Int)].rdd.aggregateByKey(StatCounter())(_ merge _, _ merge _).collect.head
res71: ((String, String), org.apache.spark.util.StatCounter) = ((col2,B),(count: 3, mean: 0,666667, stdev: 0,471405, max: 1,000000, min: 0,000000))
is aggregating statistics of all the unique values for each column.
How can I add to the count
(which is 3
for B
in col2
) a second count (maybe as a tuple) which represents the number of B
in col2
where TARGET == 1
. In this case, it should be 2
.
Upvotes: 0
Views: 557
Reputation: 330393
You shouldn't need additional aggregate here. With binary target
column, mean
is just an empirical probability of target
being equal 1:
count
* mean
count
* (1 - mean
)Upvotes: 2