Reputation: 128
I'm trying to implement a simple Doc2Vec algorithm in PySpark using a pre-trained GloVe model from https://nlp.stanford.edu/projects/glove/.
I have two RDDs:
A pair RDD called documents
in the form (K:[V]) where K is the document ID, and [V] is a list of all the words in that document, for example
('testDoc1':'i am using spark')
('testDoc2':'testing spark')
A pair RDD called words
representing the word embeddings in the form K:V where K is a word and V is the vector that represents the word, for example
('i', [0.1, 0.1, 0.1])
('spark': [0.2, 0.2, 0.2])
('am', [0.3, 0.3, 0.3])
('testing', [0.5, 0.5, 0.5])
('using', [0.4, 0.4, 0.4])
What is the correct way to iterate through the words in documents
to get an average vector sum for all of the words? In the above example, the end result would look like:
('testDoc1':[0.25, 0.25, 0.25])
('testDoc2':[0.35, 0.35, 0.35])
Upvotes: 3
Views: 544
Reputation: 1824
Suppose you have a function tokenize
that transforms the strings to a list of words. Then you can flatMap
documents
to get an RDD
of tuples (word, document id)
:
flattened_docs = documents.flatMap(lambda x: [(word, x[0]) for word in tokenize(x[1])])
Then joining with words
will give you (word, (document id, vector))
tuples, and you can drop the words at this point:
doc_vectors = flattened_docs.join(words).values
Note that this is an inner join, so you're throwing away an words that do not have embeddings. Since you presumably want to count those words in your average, a left join is likely more appropriate and you'll then have to replace any resulting None
s with the zero vector (or whatever vector of your choice).
We can group by document id to get an rdd of (document id, [list of vectors])
and then average (I'll assume you have a function called average
).
final_vectors = doc_vectors.groupByKey().mapValues(average)
(Please excuse my Scala-influenced Python. It's been a while since I've used pyspark and I haven't checked if it's flatMap
or flat_map
and so on.)
Upvotes: 3