Reputation: 193
I am new to Apache Spark and a simple map function implemented as
from pyspark import SparkContext
sc = SparkContext( 'local', 'pyspark')
f = open("Tweets_tokenised.txt")
tokenised_tweets = f.readlines()
f = open("positive.txt")
pos_words=f.readlines()
f = open("negative.txt")
neg_words=f.readlines()
def sentiment(line):
global pos_words
global neg_words
pos = 0
neg = 0
for word in line.split():
if word in pos_words:
pos=pos+1
if word in neg_words:
neg=neg+1
if(pos > neg):
return 1
else:
return 0
dist_tweets=sc.textFile("Tweets_tokenised.txt").map(sentiment)
#(lambda line: sentiment(line))
dist_tweets.saveAsTextFile("RDD.txt")
Basically I am reading a file(containing tokenised and stemmed tweets) and then doing a simple positive-negative word count on it within the map function.(3rd line from the end)But RDD.txt has nothing in it.The function sentiment is not being called at all. Can someone point out the error
Upvotes: 1
Views: 1756
Reputation: 18022
You can't change the value of a global variable inside a map
transformation in Apache Spark
to achieve this you need an Accumulator, however even using using them I think that is not the correct approach.
In your case if your pos_words
and neg_words
are not so big, you could define them as Broadcast lists, and then count by sentiment
.
Something like:
pos = sc.broadcast(["good", "gold", "silver"])
neg = sc.broadcast(["evil", "currency", "fiat"])
# I will suppose that every record is a different tweet and are stored in tuples.
tweets = sc.parallelize([("banking", "is", "evil"), ("gold", "is", "good")])
(tweets
.flatMap(lambda x: x)
.map(lambda x: (1 if x in pos.value else -1 if x in neg.value else 0, 1))
.reduceByKey(lambda a, b: a + b).take(3))
# notice that I count neutral words.
# output -> [(0, 3), (1, 2), (-1, 1)]
Note, you can check the example right here.
PD: If your idea was to count the positive and negative words per message, the approach vary very slightly.
Upvotes: 2