Alexis Zubiolo
Alexis Zubiolo

Reputation: 43

Counting both occurrences and cooccurrences in a DF

I would like to compute the mutual information (MI) between two variables x and y that I have in a Spark dataframe which looks like this:

scala> df.show()
+---+---+
|  x|  y|
+---+---+
|  0| DO|
|  1| FR|
|  0| MK|
|  0| FR|
|  0| RU|
|  0| TN|
|  0| TN|
|  0| KW|
|  1| RU|
|  0| JP|
|  0| US|
|  0| CL|
|  0| ES|
|  0| KR|
|  0| US|
|  0| IT|
|  0| SE|
|  0| MX|
|  0| CN|
|  1| EE|
+---+---+

In my case, x happens to be whether an event is occurring (x = 1) or not (x = 0), and y is a country code, but these variables could represent anything. To compute the MI between x and y I would like to have the above dataframe grouped by x, y pairs with the following three additional columns:

In the short example above, it would look like

x, y, count_x, count_y, count_xy
0, FR, 17, 2, 1
1, FR, 3, 2, 1
...

Then I would just have to compute the mutual information term for each x, y pair and sum them.

So far, I have been able to group by x, y pairs and aggregate a count(*) column but I couldn't find an efficient way to add the x and y counts. My current solution is to convert the DF into an array and count the occurrences and cooccurrences manually. It works well when y is a country but it takes forever when the cardinality of y gets big. Any suggestions as to how I could do it in a more Sparkish way?

Thanks in advance!

Upvotes: 0

Views: 588

Answers (3)

Alice
Alice

Reputation: 43

Recently, I had the same task to compute probabilities and here I would like to share my solution based on Spark's window aggregation functions:

// data is your DataFrame with two columns [x,y]
val cooccurrDF: DataFrame = data
  .groupBy(col("x"), col("y"))
  .count()
  .toDF("x", "y", "count-x-y")

val windowX: WindowSpec = Window.partitionBy("x")
val windowY: WindowSpec = Window.partitionBy("y")

val countsDF: DataFrame = cooccurrDF
  .withColumn("count-x", sum("count-x-y") over windowX)
  .withColumn("count-y", sum("count-x-y") over windowY)
countsDF.show()

First you groups every possible combination of two columns and use count to get the cooccurrences number. The windowed aggregates windowX and windowY allow summing over aggregated rows, so you will get counts for either column x or y.

+---+---+---------+-------+-------+
|  x|  y|count-x-y|count-x|count-y|
+---+---+---------+-------+-------+
|  0| MK|        1|     17|      1|
|  0| MX|        1|     17|      1|
|  1| EE|        1|      3|      1|
|  0| CN|        1|     17|      1|
|  1| RU|        1|      3|      2|
|  0| RU|        1|     17|      2|
|  0| CL|        1|     17|      1|
|  0| ES|        1|     17|      1|
|  0| KR|        1|     17|      1|
|  0| US|        2|     17|      2|
|  1| FR|        1|      3|      2|
|  0| FR|        1|     17|      2|
|  0| TN|        2|     17|      2|
|  0| IT|        1|     17|      1|
|  0| SE|        1|     17|      1|
|  0| DO|        1|     17|      1|
|  0| JP|        1|     17|      1|
|  0| KW|        1|     17|      1|
+---+---+---------+-------+-------+

Upvotes: 0

z-star
z-star

Reputation: 690

I would go with RDDs, generate a key for each use case, count by key and join the results. This way I know exactly what are the stages.

rdd.cache() // rdd is your data [x,y]
val xCnt:RDD[Int, Int] = rdd.countByKey
val yCnt:RDD[String, Int] = rdd.countByValue
val xyCnt:RDD[(Int,String), Int] = rdd.map((x, y) => ((x,y), x,y)).countByKey
val tmp = xCnt.cartsian(yCnt).map(((x, xCnt),(y, yCnt)) => ((x,y),xCnt,yCnt))
val miReady = tmp.join(xyCnt).map(((x,y), ((xCnt, yCnt), xyCnt)) => ((x,y), xCnt, yCnt, xyCnt))

another option would be to use map Partition and simply work on iterables and merge the resolutes across partitions.

Upvotes: 2

raxous
raxous

Reputation: 69

Also new to Spark but I have an idea what to do. I do not know if this is the perfect solution but I thought sharing this wouldnt harm.

What I would do is probably filter() for the value 1 to create a Dataframe and filter() for the value 0 for a second Dataframe

You would get something like

1st Dataframe

DO 1
DO 1
FR 1

In the next step i would groupBy(y)

So you would get for the 1st Dataframe

DO 1 1

FR 1

As GroupedData https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/GroupedData.html

This also has a count() function which should be counting the rows per group. Unfortunately I do not have the time to try this out by myself right now but I wanted to try and help anyway.

Edit: Please let me know if this helped, otherwise I'll delete the answer so other people still take a look at this!

Upvotes: 1

Related Questions