HHH
HHH

Reputation: 6485

How to calculate term frequency for a document in Spark?

I'm working on a document classification algorithm in Spark. I want to create a dictionary from terms in each to-be-classified document. Here is what I have so far:

def tokenize(content: String): Seq[String] = {

val tReader = new StringReader(content)
val analyzer = new EnglishAnalyzer(LuceneVersion)
val tStream = analyzer.tokenStream("contents",tReader)
val term = tStream.addAttribute(classOf[CharTermAttribute])
tStream.reset()

val result = mutable.ArrayBuffer.empty[String]
while(tStream.incrementToken()){
  result += term.toString()
}   
result

}

This function takes a string and tokenizes and stems it and return a Seq[String] and this this how I can this function

val testInstance = sc.textFile("to-be-classified.txt")
testInstance.flatMap(line1 => tokenize(line1)).map(line2 => (line2,1))    

This is as far as I've gone. Can someone help me creating a Dictionary type of structure that has the 'term' as the key and its freq as the 'value'?

EDIT: I think of a better approach however I can't quite write it. Here is some part:

 case class doc_terms(filename:String, terms:List[Pair])

Then my idea is to create an object of class doc_terms for each document I read. It contains a list of all terms for a document. Then do a reduce by key in which I should find the frequency of each term for each document. At the end I'll have an RDD in which each entity is like (file1,[('term1',12),('term2',23)...]). Can someone help me writing this?

Upvotes: 0

Views: 4077

Answers (1)

Paul
Paul

Reputation: 27443

OK so I found two ways to do this.

I am going to use a simplified tokenizer, you can replace my tokenizer with something more complex and everything should still run.

For text data I am using a text file of the novel War and Peace

Note that I've changed around the exact classes a bit to keep the types compatible. The term count function is called study with a single parameter (the input file) and returns a type DocTerms

Method 1

import scala.collection.immutable.WrappedString;
import scala.collection.Map

def tokenize(line:String):Array[String] =
    new WrappedString(line).toLowerCase().split(' ')

case class DocTerms(filename:String, terms:Map[String,Int])

def study(filename:String):DocTerms = {
    val counts = (sc
    .textFile(filename)
    .flatMap(tokenize)
    .map( (s:String) => (s,1) )
    .reduceByKey( _ + _ )
    .collectAsMap()
    )
    DocTerms(filename, counts)
}

val book1 = study("/data/warandpeace.txt")

for(c<-book1.terms.slice(20)) println(c)

output:

(worried,12)
(matthew.,1)
(follow,32)
(lost--than,1)
(diseases,1)
(reports.,1)
(scoundrel?,1)
(but--i,1)
(road--is,2)
(well-garnished,1)
(napoleon;,2)
(passion,,2)
(nataly,2)
(entreating,2)
(sounding,1)
(any?,1)
("sila,1)
(can,",3)
(motionless,22)

Note that this output is not sorted and Map types in general are not sortable but they are fast for lookups and Dictionary-like. Although only 20 elements were printed, all terms were counted and stored in the book1 object that has type DocTerms

Method 2

Alternatively, the terms portion of DocTerms could be made a type List[(String,Int)] and sorted (at some computation cost) before being returned so that the most numerous terms appear first. But that means it will not be a Map or fast lookup dictionary. However, for some uses a list-like type might be preferable.

import scala.collection.immutable.WrappedString;

def tokenize(line:String):Array[String] =
    new WrappedString(line).toLowerCase().split(' ')

case class DocTerms(filename:String, terms:List[(String,Int)])

def study(filename:String):DocTerms = {
    val counts = (sc
        .textFile(filename)
        .flatMap(tokenize)
        .map( (s:String) => (s,1) )
        .reduceByKey( _ + _ )
        .sortBy[Int]( (pair:Tuple2[String,Int]) => -pair._2 )
        .collect()
        )
    DocTerms(filename, counts.toList)
}

val book1 = study("/data/warandpeace.txt")

for(c<-book1.terms.slice(1,100)) println(c)

Output

(and,21403)
(to,16502)
(of,14903)
(,13598)
(a,10413)
(he,9296)
(in,8607)
(his,7932)
(that,7417)
(was,7202)
(with,5649)
(had,5334)
(at,4495)
(not,4481)
(her,3963)
(as,3913)
(it,3898)
(on,3666)
(but,3619)
(for,3390)
(i,3226)
(she,3225)
(is,3037)
(him,2733)
(you,2681)
(from,2661)
(all,2495)
(said,2406)
(were,2356)
(by,2354)
(be,2316)
(they,2062)
(who,1939)
(what,1935)
(which,1932)
(have,1931)
(one,1855)
(this,1836)
(prince,1705)
(an,1617)
(so,1577)
(or,1553)
(been,1460)
(their,1435)
(did,1428)
(when,1426)
(would,1339)
(up,1296)
(pierre,1261)
(only,1250)
(are,1183)
(if,1165)
(my,1135)
(could,1095)
(there,1094)
(no,1057)
(out,1048)
(into,998)
(now,957)
(will,954)
(them,942)
(more,939)
(about,919)
(went,846)
(how,841)
(we,838)
(some,826)
(him.,826)
(after,817)
(do,814)
(man,778)
(old,773)
(your,765)
(very,762)
("i,755)
(chapter,731)
(princess,726)
(him,,716)
(then,706)
(andrew,700)
(like,691)
(himself,687)
(natasha,683)
(has,677)
(french,671)
(without,665)
(came,662)
(before,658)
(me,657)
(began,654)
(looked,642)
(time,641)
(those,639)
(know,623)
(still,612)
(our,610)
(face,609)
(thought,608)
(see,605)

You might notice that the most common words are not very interesting. But we also have words like "prince", "princess", "andrew", "natasha", and "french" which are probably more specific to War and Peace.

To reduce the weight on common words once you have a bunch of documents, for scaling people often use TFIDF, or "term frequency inverse document frequency", which means each term's count is basically divided by the number of documents in the corpus in which it appears (or some similar function involving logs). But that's a topic for another question.

Upvotes: 1

Related Questions