mon
mon

Reputation: 22254

Explanation of Spark ML CountVectorizer output

Please help understand the output of the Spark ML CountVectorizer and suggest which documentation explains it.

val cv = new CountVectorizer()
  .setInputCol("Tokens")
  .setOutputCol("Frequencies")
  .setVocabSize(5000)
  .setMinTF(1)
  .setMinDF(2)
val fittedCV = cv.fit(tokenDF.select("Tokens"))
fittedCV.transform(tokenDF.select("Tokens")).show(false)

2374 should be the number of terms (words) in the dictionary. What is the "[2,6,328,548,1234]"?

Are they indices of the words "[airline, bag, vintage, world, champion]" in the dictionary? If so, why the same word "airline" has a different index "0" in the second line?

+------------------------------------------+----------------------------------------------------------------+
|Tokens                                    |Frequencies                                                     |
+------------------------------------------+----------------------------------------------------------------+
...
|[airline, bag, vintage, world, champion]  |(2374,[2,6,328,548,1234],[1.0,1.0,1.0,1.0,1.0])                 |
|[airline, bag, vintage, jet, set, brown]  |(2374,[0,2,6,328,405,620],[1.0,1.0,1.0,1.0,1.0,1.0])            |
+------------------------------------------+----------------------------------------------------------------+


  [1]: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.CountVectorizer

Upvotes: 3

Views: 2647

Answers (1)

dre-hh
dre-hh

Reputation: 8044

There is some doc explaining the basics. However this is pretty bare.

Yes. The numbers represent the words in a vocabulary index. However the order in the frequencies vector does not correspond to the order in tokens vector. airline, bag, vintage are in both rows, hence they correspond to indices [2,6,328]. But you can't rely on the same order.

The row data type is a SparseVector. The first array, shows the indices and the second the values.

e.g

vector[328] 
   => 1.0

a mapping could be as follows:

vocabulary
airline 328
bag 6
vintage 2

Frequencies
2734, [2, 6 ,328], [99, 5, 7]

# counts
vintage x 99
bag x 5
airline 7

In order to get the words back , you can do a lookup in the vocabulary. This needs to be broadcasted to different workers. You also most probably want to explode the counts per doc into separate rows.

Here is some python code snippet to extract top 25 frequent words per doc with a udf into separate rows and compute the mean for each word

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import Row

vocabulary = sc.broadcast(fittedCV.vocabulary)

def _top_scores(v):
    # create count tuples for each index(i) in a vector(v)
    # `.item()` is used, because in python the count value is a numpy datatype, in `scala` it will be just double 

    counts = [Row(i=i.item(),count=v[i.item()].item()) for i in v.indices]
    # => [Row(i=2, count=30, Row(i=362, count=40)]

    # return 25 top count rows
    counts = sorted(counts, reverse=True, key=lambda x: x.count)
    return counts[:25]

top_scores = F.udf(_top_scores, T.ArrayType(T.StructType().add('i', T.IntegerType()).add('count', T.DoubleType())))                  
vec_to_word = F.udf(_vecToWord, T.StringType())


def _vecToWord(i):
    return vocabulary.value[i]



res = df.withColumn('word_count', explode(top_scores('Frequencies')))
=>
+-----+-----+----------+ 
doc_id, ..., word_count
             (i,  count)
+-----+-----+----------+
4711, ...,   (2, 30.0)
4711, ...,   (362, 40.0)
+-----+-----+----------+

res = res \
    .groupBy('word_count.i').agg( \
        avg('word_count.count').alias('mean')
    .orderBy('mean', ascending=False)

res = res.withColumn('token', vec_to_word('i')) 


=>
+---+---------+----------+ 
 i,   token,    mean
+---+---------+----------+ 
 2,   vintage,  15
 328, airline,  30  

+--+----------+----------+ 

Upvotes: 4

Related Questions