Parag Kuhikar
Parag Kuhikar

Reputation: 485

How to run twitter popular tags of Spark streaming using scala?

I am new to Spark so please guide.

There are lots of example available related to Spark streaming using Scala.

You could check it out from https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples.

I want to run TwitterPopularTags.scala.

I am not able to set the twitter login details for this example.

http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html#linking-with-spark-streaming

I am successfully run the network count example.

But when i execute
./run-example org.apache.spark.streaming.examples.TwitterPopularTags local[2] then it will show me authentication failure issue...

I set twitter login details before initializing string context in TwitterPopularTags.scala like

 System.setProperty("twitter4j.oauth.consumerKey", "####");
 System.setProperty("twitter4j.oauth.consumerSecret", "##");
 System.setProperty("twitter4j.oauth.accessToken", "##");
 System.setProperty("twitter4j.oauth.accessTokenSecret", "##");

Please guide.

Upvotes: 2

Views: 6201

Answers (2)

sachin thirumala
sachin thirumala

Reputation: 51

I was not able to open the github link https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples.

However you could use the below code which worked for me.

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._

/**
 * A Spark Streaming application that receives tweets on certain 
 * keywords from twitter datasource and find the popular hashtags
 * 
 * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
 * <comsumerKey>        - Twitter consumer key 
 * <consumerSecret>     - Twitter consumer secret
 * <accessToken>        - Twitter access token
 * <accessTokenSecret>  - Twitter access token secret
 * <keyword_1>          - The keyword to filter tweets
 * <keyword_n>          - Any number of keywords to filter tweets
 * 
 * More discussion at stdatalabs.blogspot.com
 * 
 * @author Sachin Thirumala
 */

object SparkPopularHashTags {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
  val sc = new SparkContext(conf)

  def main(args: Array[String]) {

    sc.setLogLevel("WARN")

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)

    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))
    // Pass the filter keywords as arguements

    //  val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)  
    val stream = TwitterUtils.createStream(ssc, None, filters)

    // Split the stream on space and extract hashtags 
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // print tweets in the currect DStream 
    stream.print()

    // Print popular hashtags  
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    ssc.start()
    ssc.awaitTermination()
  }
} 

Explanation:
setMaster("local[4]") - Make sure to set master to local mode with at least 2 threads as 1 thread is used for collecting the incoming streams and another thread for processing it.

We count the popular hashtags with the below code:

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

The above snippet does a word count of the hashtags over the previous 60/10 secs as specified in the reduceByKeyAndWindow and sorts them in descending order.

reduceByKeyAndWindow is used in case we have to apply transformations on data that is accumulated in the previous stream intervals.

Execute the code by passing the four twitter OAuth tokens as arguments: enter image description here

You should see the popular hashtags over every 10/60 second interval. enter image description here

You may check similar projects by integrating spark streaming and storm with flume and kafka in the below links:

Spark Streaming:

Spark Streaming part 1: Real time twitter sentiment analysis http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-1-real-time.html

Spark streaming part 2: Real time twitter sentiment analysis using Flume http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-2-real-time_10.html

Spark streaming part 3: Real time twitter sentiment analysis using kafka http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html

Data guarantees in Spark Streaming with kafka integration http://stdatalabs.blogspot.in/2016/10/data-guarantees-in-spark-streaming-with.html

Storm:

Realtime stream processing using Apache Storm - Part 1 http://stdatalabs.blogspot.in/2016/09/realtime-stream-processing-using-apache.html

Realtime stream processing using Apache Storm and Kafka - Part 2 http://stdatalabs.blogspot.in/2016/10/real-time-stream-processing-using.html

Upvotes: 1

Dronkel
Dronkel

Reputation: 78

Put the file "twitter4j.properties" into the Spark root directory (e.g. spark-0.8.0-incubating) before you run the Twitter examples.

twitter4j.properties:

oauth.consumerKey=***
oauth.consumerSecret=***
oauth.accessToken=***
oauth.accessTokenSecret=***

Worked for me on Mac with the Scala examples.

Upvotes: 2

Related Questions