Reputation: 121
I'm new to Spark. I'm trying to implement tf-idf. I need to calculate how many times each word occurs in each document and number of total words in each document.
I want to make reduce and possibly another operation but I don't know yet how. Here's the input I have:
Pairs are of the form (documentName , (word, wordCount))
ex.
("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)),
("doc2",("have", 5))
Keys are documents and values are words and how many times that word occurs in that document. I want to count total words in every document and possibly calculate percentage of that word.
Output I want:
("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)),
("doc2", (("a", 5),10)), ("doc2", (("have", 5),10))
I get the effect by
corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1]))
Starting point :
collect_of_docs = [(doc1,text1), (doc2, text2),....]
def count_words(x):
l = []
words = x[1].split()
for w in words:
l.append(((w, x[0]), 1))
return l
sc = SparkContext()
corpus = sc.parallelize(collect_of_docs)
input = (corpus
.flatMap(count_words)
.reduceByKey(add)
.map(lambda ((x,y), z) : (y, (x,z))))
If possible I want to make only one reduce operation with a tricky operator maybe. Any help is appreciated :) Thanks in advance.
Upvotes: 1
Views: 265
Reputation: 330413
Generally speaking it doesn't make sense to flatMap
just to gather your data later. I assume your data looks more or less like this:
collect_of_docs = sc.parallelize([
(1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
(2, "Mauris magna sem, vehicula sed dictum finibus, posuere id ipsum."),
(3, "Duis eleifend molestie dolor, quis fringilla eros facilisis ac.")])
First we'll need some helpers using a basic regular expression and a Counter
:
from __future__ import division # If for some reason you use Python 2.x
import re
from collections import Counter
def count_words(doc, pattern=re.compile("\w+")):
"""Given a tuple (doc_id, text)
return a tuple (doc_id, tokens_count
>>> count_words((1, "Foo bar bar."))
(1, Counter({'Foo': 1, 'bar': 2}))
"""
(doc_id, text) = doc
return (doc_id, Counter(pattern.findall(text)))
def compute_tf(cnt):
"""Convert term counter to term frequency
>>> compute_tf(Counter({'Foo': 1, 'bar': 2}))
{'Foo': 0.3333333333333333, 'bar': 0.6666666666666666}
"""
n = sum(cnt.values())
return {k: v / n for (k, v) in cnt.items()}
and the final results:
tfs = (collect_of_docs
.map(count_words)
.mapValues(compute_tf))
tfs.sortByKey().first()
## (1,
## {'Lorem': 0.125,
## 'adipiscing': 0.125,
## 'amet': 0.125,
## 'consectetur': 0.125,
## 'dolor': 0.125,
## 'elit': 0.125,
## 'ipsum': 0.125,
## 'sit': 0.125})
Using above document frequency can computed as follows:
from operator import add
dfs = (tfs
.values()
.flatMap(lambda kv: ((k, 1) for k in kv.keys()))
.reduceByKey(add))
dfs.sortBy(lambda x: -x[1]).take(5)
## [('ipsum', 2),
## ('dolor', 2),
## ('consectetur', 1),
## ('finibus', 1),
## ('fringilla', 1)]
Upvotes: 1