Reputation: 22254
Please help understand the output of the Spark ML CountVectorizer and suggest which documentation explains it.
val cv = new CountVectorizer()
.setInputCol("Tokens")
.setOutputCol("Frequencies")
.setVocabSize(5000)
.setMinTF(1)
.setMinDF(2)
val fittedCV = cv.fit(tokenDF.select("Tokens"))
fittedCV.transform(tokenDF.select("Tokens")).show(false)
2374 should be the number of terms (words) in the dictionary. What is the "[2,6,328,548,1234]"?
Are they indices of the words "[airline, bag, vintage, world, champion]" in the dictionary? If so, why the same word "airline" has a different index "0" in the second line?
+------------------------------------------+----------------------------------------------------------------+
|Tokens |Frequencies |
+------------------------------------------+----------------------------------------------------------------+
...
|[airline, bag, vintage, world, champion] |(2374,[2,6,328,548,1234],[1.0,1.0,1.0,1.0,1.0]) |
|[airline, bag, vintage, jet, set, brown] |(2374,[0,2,6,328,405,620],[1.0,1.0,1.0,1.0,1.0,1.0]) |
+------------------------------------------+----------------------------------------------------------------+
[1]: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.CountVectorizer
Upvotes: 3
Views: 2647
Reputation: 8044
There is some doc explaining the basics. However this is pretty bare.
Yes. The numbers represent the words in a vocabulary index. However the order in the frequencies vector does not correspond to the order in tokens vector.
airline, bag, vintage
are in both rows, hence they correspond to indices [2,6,328]. But you can't rely on the same order.
The row data type is a SparseVector. The first array, shows the indices and the second the values.
e.g
vector[328]
=> 1.0
a mapping could be as follows:
vocabulary
airline 328
bag 6
vintage 2
Frequencies
2734, [2, 6 ,328], [99, 5, 7]
# counts
vintage x 99
bag x 5
airline 7
In order to get the words back , you can do a lookup in the vocabulary. This needs to be broadcasted to different workers. You also most probably want to explode the counts per doc into separate rows.
Here is some python
code snippet to extract top 25 frequent words per doc with a udf into separate rows and compute the mean for each word
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import Row
vocabulary = sc.broadcast(fittedCV.vocabulary)
def _top_scores(v):
# create count tuples for each index(i) in a vector(v)
# `.item()` is used, because in python the count value is a numpy datatype, in `scala` it will be just double
counts = [Row(i=i.item(),count=v[i.item()].item()) for i in v.indices]
# => [Row(i=2, count=30, Row(i=362, count=40)]
# return 25 top count rows
counts = sorted(counts, reverse=True, key=lambda x: x.count)
return counts[:25]
top_scores = F.udf(_top_scores, T.ArrayType(T.StructType().add('i', T.IntegerType()).add('count', T.DoubleType())))
vec_to_word = F.udf(_vecToWord, T.StringType())
def _vecToWord(i):
return vocabulary.value[i]
res = df.withColumn('word_count', explode(top_scores('Frequencies')))
=>
+-----+-----+----------+
doc_id, ..., word_count
(i, count)
+-----+-----+----------+
4711, ..., (2, 30.0)
4711, ..., (362, 40.0)
+-----+-----+----------+
res = res \
.groupBy('word_count.i').agg( \
avg('word_count.count').alias('mean')
.orderBy('mean', ascending=False)
res = res.withColumn('token', vec_to_word('i'))
=>
+---+---------+----------+
i, token, mean
+---+---------+----------+
2, vintage, 15
328, airline, 30
+--+----------+----------+
Upvotes: 4