Reputation: 1
I am trying to create a data frame using pyspark and hive on a cloudera vm,but every time i get this error.
Traceback (most recent call last): File "/home/cloudera/Desktop/TwitterSentimentAnalysis/SentimentAnalysis.py", line 98, in .reduceByKey(lambda a, b: a+b) \ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 62, in toDF File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 404, in createDataFrame File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 285, in _createFromRDD File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 229, in _inferSchema File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1320, in first ValueError: RDD is empty
INFO spark.SparkContext: Invoking stop() from shutdown hook
What should I do to fix this error.
Edit 2 - sc = SparkContext(appName="PythonSentimentAnalysis") sqlCtx = HiveContext(sc)
filenameAFINN = "/home/cloudera/Desktop/TwitterSentimentAnalysis/AFINN/AFINN-111.txt"
afinn = dict(map(lambda (w, s): (w, int(s)), [ ws.strip().split('\t') for ws in open(filenameAFINN) ]))
filenameCandidate = "file:///home/cloudera/Desktop/TwitterSentimentAnalysis/Candidates/Candidate Mapping.txt"
candidates = sc.textFile(filenameCandidate).map(lambda x: (x.strip().split(",")[0],x.strip().split(","))) \
.flatMapValues(lambda x:x).map(lambda y: (y[1],y[0])).distinct()
pattern_split = re.compile(r"\W+")
tweets = sqlCtx.sql("select id, text, entities.user_mentions.name from incremental_tweets")
def sentiment(text):
words = pattern_split.split(text.lower())
sentiments = map(lambda word: afinn.get(word, 0), words)
if sentiments:
sentiment = float(sum(sentiments))/math.sqrt(len(sentiments))
else:
sentiment = 0
return sentiment
sentimentTuple = tweets.rdd.map(lambda r: [r.id, r.text, r.name]) \
.map(lambda r: [sentiment(r[1]),r[2]]) \
.flatMapValues(lambda x: x) \
.map(lambda y: (y[1],y[0])) \
.reduceByKey(lambda x, y: x+y) \
.sortByKey(ascending=True)
scoreDF = sentimentTuple.join(candidates) \
.map(lambda (x,y): (y[1],y[0])) \
.reduceByKey(lambda a,b: a+b) \
.toDF()
scoreRenameDF = scoreDF.withColumnRenamed("_1","Candidate").withColumnRenamed("_2","Score")
sqlCtx.registerDataFrameAsTable(scoreRenameDF, "SCORE_TEMP")
sqlCtx.sql("INSERT OVERWRITE TABLE candidate_score SELECT Candidate, Score FROM SCORE_TEMP")
Upvotes: 0
Views: 918
Reputation: 33
Try to check the intermediate RDDs by using below code, if they are created properly:
for i in rdd.take(10): print(i)
This will show first 10 entries of your RDD
Upvotes: 0