Reputation: 11
Let's say I have document with a bunch of phrases separated by a comma:
love, new, truck, present
environment, save, trying, stop, destroying
great, environment, save, money, animals, zoo, daughter, fun
impressive, loved, speech, inspiration
Happy Birthday, brother, years, old
save, money, stop, spending
new, haircut, love, check it out
Now I want to use Spark to count the number of co-occurring elements. Thus, I want to see
{
(love, new): 2,
(new, truck): 1,
(love, truck): 1,
(truck, present): 1,
(new, present): 1,
(love, present): 1,
(great, environment): 1,
(environment, save): 2,
(environment, trying): 1,
....
(love, check it out): 1
}
Any suggestions on how to do this?
I currently have created the RDD of the document (i'm calling it phrase_list_RDD
) and I understand I can use a phrase_list_RDD.flatMap(lambda line: line.split(","))
to parse the line into elements, but I'm having trouble coming up with the last part to solve my problem. If anyone has any suggestions, I would appreciate it.
Upvotes: 1
Views: 1444
Reputation: 37842
After splitting (and I've added trimming to get rid of spaces), you can use List.combinations(2)
to get all combinations of two words. Passed into flatMap
, this will result with an RDD[List[String]]
where each record is a list of size 2.
From there on - it's a simple "word count":
val result: RDD[(List[String], Int)] = phrase_list_RDD
.map(_.split(",").map(_.trim).toList) // convert records to List[String]
.flatMap(_.combinations(2)) // take all combinations of two words
.map((_, 1)) // prepare for reducing - starting with 1 for each combination
.reduceByKey(_ + _) // reduce
// result:
// ...
// (List(environment, daughter),1)
// (List(save, daughter),1)
// (List(money, stop),1)
// (List(great, environment),1)
// (List(save, stop),2)
// ...
Upvotes: 2
Reputation: 4161
Once you get lines of text out of the dataframe you can split them and count the occurrences as shown below:
import scala.collection.mutable
object CoOccurrence {
val text = Seq("love, new, truck, present", "environment, save, trying, stop, destroying", "great, environment, save, money, animals, zoo, daughter, fun", "impressive, loved, speech, inspiration", "Happy Birthday, brother, years, old", "save, money, stop, spending", "new, haircut, love, check it out")
def main(args: Array[String]) {
val cooc = mutable.Map.empty[(String, String), Int]
text.foreach { line =>
val words = line.split(",").map(_.trim).sorted
val n = words.length
for {
i <- 0 until n-1
j <- (i + 1) until n
} {
val currentCount = cooc.getOrElseUpdate((words(i), words(j)), 0)
cooc((words(i), words(j))) = currentCount + 1
}
}
println(cooc)
}
}
Upvotes: -1