Reputation: 95
I am new at Spark and I absolutely need some help for classifying tweets from a Kafka Stream. Following I will explain the step processes i have done until now as well as the point where I'm stuck.
I hope some of you guys can help me out with this.
Thanks in advance.
The context is the following:
I have a simple Kafka Producer which simulates a tweet's stream (read from a file) and a TweetAnalyzer Consumer which should process and classify the tweets on a Spark Streaming Context, as soon as it receive them.
In order to classify the received tweets, I have earlier built-up and stored on the disk a TF-IDF and Naive Bayes models which are loaded before the Spark Streaming Context starts.
For each tweet processed (stemming, punctuation, etc), I should compute its TF-IDF vector (feature vector) and classify it by exploiting respectively the IDF and Naive Bayes Models previously loaded.
Going straight to the point, my problem occurs when I have to transform the tweet's term frequency vectors (TF) to its TF-IDF vectors.
This is the code:
Kafka Producer
text_file = list(
csv.reader(
open('/twitterDataset/twitter/test_data.txt', 'rU')
)
)
for row in text_file:
time.sleep(1)
jd = json.dumps(row).encode('ascii')
producer.send(kafka_topic,jd)
TweetAnalyzer
#setting configuration
...
#reading configuration
...
#setting Kafka configuration
...
# Create Spark context
sc = SparkContext(
appName = app_name,
master = spark_master
)
# Create Streaming context
ssc = StreamingContext(
sc,
int(spark_batch_duration)
)
# Loading TF MODEL and compute TF-IDF
....
kafkaParams = {'metadata.broker.list"': kafka_brokers}
# Create direct kafka stream with brokers and topics
kvs = KafkaUtils.createDirectStream(
ssc,
[kafka_topic],
{"metadata.broker.list": kafka_brokers}
)
obj1 = TweetPreProcessing()
lines = kvs.map(lambda x: x[1])
tweet = lines.flatMap(obj1.TweetBuilder)
hashingTF = HashingTF()
#computing TF for each tweet
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.map(lambda x: IDF().fit(x))
.pprint()
ssc.start()
ssc.awaitTermination()
In the last lines of code I cannot apply the IDF().fit(x) function on x since Spark expects an "RDD of term frequency vectors" whereas in this point I have a "Trasformed DStream" due the the Streaming Spark Context.
I've tried to use either the transform() or foreachRDD() function instead of map(), but i don't know how to return correctly a new DStream after the transformation.
For example:
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.transform(classify_tweet)
.pprint()
def classify_tweet(tf):
#compute TF-IDF of the tweet
idf = IDF().fit(tf)
tf_idf = idf.transform(tf)
#print(tf_idf.collect())
return idf
If I run the code using the transform function, Spark triggers (at the top of the back-trace) this error:
File "/workspace_spark/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 67, in call return r._jrdd
AttributeError: 'IDFModel' object has no attribute '_jrdd'
But if I omit the return statement and simply print the tf_idf vector it gives me the correct output which looks like this:
[SparseVector(1048576, {164998: 0.0, 364601: 0.0, 924192: 0.0, 963449: 0.0})]
[SparseVector(1048576, {251465: 0.0, 821055: 0.0, 963449: 0.0})]
[SparseVector(1048576, {234762: 0.0, 280973: 0.0, 403903: 0.0, 712732: 0.0, 861562: 0.0, 1040690: 0.0})] ...
If i've got it right, I think the problem is that I cannot return a SparseVector when it expects a DStream.
Anyhow, is there a solution for this problem ?
I'd be very thankful if somebody can help me out with this, I am tragically stuck.
Thank you
Upvotes: 1
Views: 970
Reputation:
Return transformed tf_idf
:
>>> def classify_tweet(tf):
... return IDF().fit(tf).transform(tf)
Upvotes: 0