Sara
Sara

Reputation: 11

How do I count all of the co-occurring elements in a document line using Spark?

Let's say I have document with a bunch of phrases separated by a comma:

love, new, truck, present
environment, save, trying, stop, destroying
great, environment, save, money, animals, zoo, daughter, fun
impressive, loved, speech, inspiration
Happy Birthday, brother, years, old
save, money, stop, spending
new, haircut, love, check it out

Now I want to use Spark to count the number of co-occurring elements. Thus, I want to see

{
  (love, new): 2, 
  (new, truck): 1, 
  (love, truck): 1, 
  (truck, present): 1, 
  (new, present): 1,
  (love, present): 1, 
  (great, environment): 1, 
  (environment, save): 2,
  (environment, trying): 1, 
  .... 
  (love, check it out): 1
}

Any suggestions on how to do this?

I currently have created the RDD of the document (i'm calling it phrase_list_RDD) and I understand I can use a phrase_list_RDD.flatMap(lambda line: line.split(",")) to parse the line into elements, but I'm having trouble coming up with the last part to solve my problem. If anyone has any suggestions, I would appreciate it.

Upvotes: 1

Views: 1444

Answers (2)

Tzach Zohar
Tzach Zohar

Reputation: 37842

After splitting (and I've added trimming to get rid of spaces), you can use List.combinations(2) to get all combinations of two words. Passed into flatMap, this will result with an RDD[List[String]] where each record is a list of size 2.

From there on - it's a simple "word count":

val result: RDD[(List[String], Int)] = phrase_list_RDD
  .map(_.split(",").map(_.trim).toList) // convert records to List[String]
  .flatMap(_.combinations(2))  // take all combinations of two words
  .map((_, 1))                 // prepare for reducing - starting with 1 for each combination
  .reduceByKey(_ + _)          // reduce

// result:
// ... 
// (List(environment, daughter),1)
// (List(save, daughter),1)
// (List(money, stop),1)
// (List(great, environment),1)
// (List(save, stop),2)
// ... 

Upvotes: 2

radumanolescu
radumanolescu

Reputation: 4161

Once you get lines of text out of the dataframe you can split them and count the occurrences as shown below:

import scala.collection.mutable

object CoOccurrence {

  val text = Seq("love, new, truck, present", "environment, save, trying, stop, destroying", "great, environment, save, money, animals, zoo, daughter, fun", "impressive, loved, speech, inspiration", "Happy Birthday, brother, years, old", "save, money, stop, spending", "new, haircut, love, check it out")

  def main(args: Array[String]) {
    val cooc = mutable.Map.empty[(String, String), Int]

    text.foreach { line =>
      val words = line.split(",").map(_.trim).sorted
      val n = words.length
      for {
        i <- 0 until n-1
        j <- (i + 1) until n
      } {
        val currentCount = cooc.getOrElseUpdate((words(i), words(j)), 0)
        cooc((words(i), words(j))) = currentCount + 1
      }
    }

    println(cooc)

  }
}

Upvotes: -1

Related Questions