Reputation: 567
I am fairly new to spark, I have a task to get top hundred words from a set of tweets for each character from small alphabets. for example
a: (word1, count1), (word2, count2).. (word100, count100)
b: (word1, count1), (word2, count2).. (word100, count100)
.
.
z: (word1, count1), (word2, count2).. (word100, count100)
This is my code:
words_mapped = (en_text.flatMap(lambda x: x.split())
.filter(lambda x: x[0] in valid_chars )
.map(lambda x: (x[0], x)))
This gives a tuple with character and the word, now I have to group these characters and find the count of each word in values and show the top 100 words with their count.
How can I translate this into pyspark.
Upvotes: 0
Views: 1947
Reputation: 1676
Spark makes it easy to aggregate (key, value) pairs. Here, you have two stages--in the first stage, your key is (character, word) and then in the second stage your key is (character). (First, you need to calculate the counts; second, you need to find the most important ones.)
The first is pretty simple using reduceByKey
(docs).
words_counted = words_mapped.map( lambda x: (x, 1))
.reduceByKey(add)
Now we need to filter down to the top 100. This is something Spark is not as good at, because it requiring dealing with multiple rows at once. Scala has the TopByKey
function, but that doesn't appear to be supported in PySpark yet.
So instead let's loop over the characters (at least there are only 26) and use takeOrdered
(docs) like so:
char = 'a'
charRDD = words_counted.filter(lambda x: x[0][0]==char).takeOrdered(100, key=lambda x: -x[1])
You can then concatenate those lists as appropriate.
Some alternative approaches: use partitionBy
(docs) to put each group in its own partition, and then mapPartitions
(docs) to convert the iterator for each group into the relevant object (say, sorting it and then taking the top 100).
Another possibility to use foldByKey
(docs), starting with an empty list, adding the next element to the list by binary insertion, and then dropping any elements after 100.
Upvotes: 1