Reputation: 43
I would like to compute the mutual information (MI) between two variables x
and y
that I have in a Spark dataframe which looks like this:
scala> df.show()
+---+---+
| x| y|
+---+---+
| 0| DO|
| 1| FR|
| 0| MK|
| 0| FR|
| 0| RU|
| 0| TN|
| 0| TN|
| 0| KW|
| 1| RU|
| 0| JP|
| 0| US|
| 0| CL|
| 0| ES|
| 0| KR|
| 0| US|
| 0| IT|
| 0| SE|
| 0| MX|
| 0| CN|
| 1| EE|
+---+---+
In my case, x
happens to be whether an event is occurring (x = 1
) or not (x = 0
), and y
is a country code, but these variables could represent anything. To compute the MI between x
and y
I would like to have the above dataframe grouped by x, y
pairs with the following three additional columns:
x
y
x, y
In the short example above, it would look like
x, y, count_x, count_y, count_xy
0, FR, 17, 2, 1
1, FR, 3, 2, 1
...
Then I would just have to compute the mutual information term for each x, y
pair and sum them.
So far, I have been able to group by x, y
pairs and aggregate a count(*)
column but I couldn't find an efficient way to add the x
and y
counts. My current solution is to convert the DF into an array and count the occurrences and cooccurrences manually. It works well when y
is a country but it takes forever when the cardinality of y
gets big. Any suggestions as to how I could do it in a more Sparkish way?
Thanks in advance!
Upvotes: 0
Views: 588
Reputation: 43
Recently, I had the same task to compute probabilities and here I would like to share my solution based on Spark's window aggregation functions:
// data is your DataFrame with two columns [x,y]
val cooccurrDF: DataFrame = data
.groupBy(col("x"), col("y"))
.count()
.toDF("x", "y", "count-x-y")
val windowX: WindowSpec = Window.partitionBy("x")
val windowY: WindowSpec = Window.partitionBy("y")
val countsDF: DataFrame = cooccurrDF
.withColumn("count-x", sum("count-x-y") over windowX)
.withColumn("count-y", sum("count-x-y") over windowY)
countsDF.show()
First you groups every possible combination of two columns and use count to get the cooccurrences number. The windowed aggregates windowX and windowY allow summing over aggregated rows, so you will get counts for either column x or y.
+---+---+---------+-------+-------+
| x| y|count-x-y|count-x|count-y|
+---+---+---------+-------+-------+
| 0| MK| 1| 17| 1|
| 0| MX| 1| 17| 1|
| 1| EE| 1| 3| 1|
| 0| CN| 1| 17| 1|
| 1| RU| 1| 3| 2|
| 0| RU| 1| 17| 2|
| 0| CL| 1| 17| 1|
| 0| ES| 1| 17| 1|
| 0| KR| 1| 17| 1|
| 0| US| 2| 17| 2|
| 1| FR| 1| 3| 2|
| 0| FR| 1| 17| 2|
| 0| TN| 2| 17| 2|
| 0| IT| 1| 17| 1|
| 0| SE| 1| 17| 1|
| 0| DO| 1| 17| 1|
| 0| JP| 1| 17| 1|
| 0| KW| 1| 17| 1|
+---+---+---------+-------+-------+
Upvotes: 0
Reputation: 690
I would go with RDDs, generate a key for each use case, count by key and join the results. This way I know exactly what are the stages.
rdd.cache() // rdd is your data [x,y]
val xCnt:RDD[Int, Int] = rdd.countByKey
val yCnt:RDD[String, Int] = rdd.countByValue
val xyCnt:RDD[(Int,String), Int] = rdd.map((x, y) => ((x,y), x,y)).countByKey
val tmp = xCnt.cartsian(yCnt).map(((x, xCnt),(y, yCnt)) => ((x,y),xCnt,yCnt))
val miReady = tmp.join(xyCnt).map(((x,y), ((xCnt, yCnt), xyCnt)) => ((x,y), xCnt, yCnt, xyCnt))
another option would be to use map Partition and simply work on iterables and merge the resolutes across partitions.
Upvotes: 2
Reputation: 69
Also new to Spark but I have an idea what to do. I do not know if this is the perfect solution but I thought sharing this wouldnt harm.
What I would do is probably filter() for the value 1 to create a Dataframe and filter() for the value 0 for a second Dataframe
You would get something like
1st Dataframe
DO 1
DO 1
FR 1
In the next step i would groupBy(y)
So you would get for the 1st Dataframe
DO 1 1
FR 1
As GroupedData https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/GroupedData.html
This also has a count() function which should be counting the rows per group. Unfortunately I do not have the time to try this out by myself right now but I wanted to try and help anyway.
Edit: Please let me know if this helped, otherwise I'll delete the answer so other people still take a look at this!
Upvotes: 1