Reputation: 93
I use python 2 and spark. I follow an instruction how to count words on twitter on this link https://github.com/Ruthvicp/CS5590_BigDataProgramming/wiki/Lab-Assignment-4----Spark-MLlib-classification-algorithms,-word-count-on-twitter-streaming I have 2 file TSWordCount
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import desc
from collections import namedtuple
import os
os.environ["SPARK_HOME"] = "C:\\spark-2.3.1-bin-hadoop2.7\\spark-2.3.1-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "C:\\winutils\\"
def main():
sc =SparkContext(appName="Countwords1234")
wordcount = {}
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 5678)
fields = ("word", "count")
Tweet = namedtuple('Text', fields)
# lines = socket_stream.window(20)
counts = lines.flatMap(lambda text: text.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a + b).map(lambda rec: Tweet(rec[0], rec[1]))
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
When i run this file, it is succeed and output is "Listening to port 5678" and my second file is TwitterListener
import findspark
findspark.init()
import pyspark
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
import time
consumer_key = '30f****'
consumer_secret = 'smu7B******
access_token = '153*******'
access_secret = 'QIizsB***'
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
class TweetsListener(StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
msg = json.loads(data)
print(msg['text'].encode('utf-8'))
self.client_socket.send(msg['text'].encode('utf-8'))
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
def sendData(c_socket):
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, TweetsListener(c_socket))
twitter_stream.filter(track=['fifa'])
if __name__ == "__main__":
s = socket.socket() # Create a socket object
host = "localhost" # Get local machine name
port = 5678 # Reserve a port for your service.
s.bind((host, port)) # Bind to the port
print("Listening on port: %s" % str(port))
s.listen(5) # Now wait for client connection.
c, addr = s.accept() # Establish connection with client.
print("Received request from: " + str(addr))
time.sleep(5)
sendData(c)
Like you see file twitter listener listening to port localhost:5678 . Then in file TSWordCount,I use SparkContext(appname="") , i think i should put my app 's name on twitter here so i put Countwors124 there. Then I call to port by ssc.socketTextStream("localhost",5678). But i have error at TSWordCount when i run it appear the error say that Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by I search the error and i found a solution like use sc.stop() so that i put it after ssc.awaitTermination(). But it didn't work. What should i do now ?
Upvotes: 0
Views: 2064
Reputation: 93
I found an answer . i replaced sc =SparkContext(appName="Countwords1234")
with sc = SparkContext.getOrCreate()
and everything worked . although i still not understand , at the end of the day result matters LOL
Upvotes: 3