Davide Nardone
Davide Nardone

Reputation: 95

Spark Streaming - Classification of tweets' stream from Kafka

I am new at Spark and I absolutely need some help for classifying tweets from a Kafka Stream. Following I will explain the step processes i have done until now as well as the point where I'm stuck.

I hope some of you guys can help me out with this.

Thanks in advance.

The context is the following:

I have a simple Kafka Producer which simulates a tweet's stream (read from a file) and a TweetAnalyzer Consumer which should process and classify the tweets on a Spark Streaming Context, as soon as it receive them.

In order to classify the received tweets, I have earlier built-up and stored on the disk a TF-IDF and Naive Bayes models which are loaded before the Spark Streaming Context starts.

For each tweet processed (stemming, punctuation, etc), I should compute its TF-IDF vector (feature vector) and classify it by exploiting respectively the IDF and Naive Bayes Models previously loaded.

Going straight to the point, my problem occurs when I have to transform the tweet's term frequency vectors (TF) to its TF-IDF vectors.

This is the code:

Kafka Producer

text_file = list(
    csv.reader(
        open('/twitterDataset/twitter/test_data.txt', 'rU')
    )
)

for row in text_file:
    time.sleep(1)
    jd = json.dumps(row).encode('ascii')
    producer.send(kafka_topic,jd)

TweetAnalyzer

#setting configuration
...  
#reading configuration
...
#setting Kafka configuration
...

# Create Spark context
sc = SparkContext(
    appName = app_name,
    master  = spark_master
)

# Create Streaming context
ssc = StreamingContext(
    sc,
    int(spark_batch_duration)
)

# Loading TF MODEL and compute TF-IDF
....

kafkaParams = {'metadata.broker.list"': kafka_brokers}

# Create direct kafka stream with brokers and topics
kvs = KafkaUtils.createDirectStream(
    ssc,
    [kafka_topic],
    {"metadata.broker.list": kafka_brokers}
)

obj1 = TweetPreProcessing()

lines = kvs.map(lambda x: x[1])

tweet = lines.flatMap(obj1.TweetBuilder)

hashingTF = HashingTF()

#computing TF for each tweet
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
                .map(lambda x: IDF().fit(x))
                .pprint()

ssc.start()
ssc.awaitTermination()

In the last lines of code I cannot apply the IDF().fit(x) function on x since Spark expects an "RDD of term frequency vectors" whereas in this point I have a "Trasformed DStream" due the the Streaming Spark Context.

I've tried to use either the transform() or foreachRDD() function instead of map(), but i don't know how to return correctly a new DStream after the transformation.

For example:

tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
                .transform(classify_tweet)
                .pprint()

def classify_tweet(tf):

    #compute TF-IDF of the tweet
    idf = IDF().fit(tf)
    tf_idf = idf.transform(tf)

    #print(tf_idf.collect())

    return idf

If I run the code using the transform function, Spark triggers (at the top of the back-trace) this error:

File "/workspace_spark/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 67, in call return r._jrdd
AttributeError: 'IDFModel' object has no attribute '_jrdd'

But if I omit the return statement and simply print the tf_idf vector it gives me the correct output which looks like this:

[SparseVector(1048576, {164998: 0.0, 364601: 0.0, 924192: 0.0, 963449: 0.0})]
[SparseVector(1048576, {251465: 0.0, 821055: 0.0, 963449: 0.0})]
[SparseVector(1048576, {234762: 0.0, 280973: 0.0, 403903: 0.0, 712732: 0.0, 861562: 0.0, 1040690: 0.0})] ...

If i've got it right, I think the problem is that I cannot return a SparseVector when it expects a DStream.

Anyhow, is there a solution for this problem ?

I'd be very thankful if somebody can help me out with this, I am tragically stuck.

Thank you

Upvotes: 1

Views: 970

Answers (1)

user6022341
user6022341

Reputation:

Return transformed tf_idf:

>>> def classify_tweet(tf):
...     return IDF().fit(tf).transform(tf)

Upvotes: 0

Related Questions