subho
subho

Reputation: 591

Twitter data from spark

I am learning Twitter integretion with Spark streaming.

   import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.SparkContext._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.SparkConf

    /**
     * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
     * stream. The stream is instantiated with credentials and optionally filters supplied by the
     * command line arguments.
     *
     * Run this on your local machine as
     *
     */
    object TwitterPopularTags {
      def main(args: Array[String]) {


        if (args.length < 4) {
          System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
            "<access token> <access token secret> [<filters>]")
          System.exit(1)
        }

        StreamingExamples.setStreamingLogLevels()

        val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
        val filters = args.takeRight(args.length - 4)

        // Set the system properties so that Twitter4j library used by twitter stream
        // can use them to generat OAuth credentials
        System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
        System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
        System.setProperty("twitter4j.oauth.accessToken", accessToken)
        System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

        val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream

        val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

        val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                         .map{case (topic, count) => (count, topic)}
                         .transform(_.sortByKey(false))

        val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
                         .map{case (topic, count) => (count, topic)}
                         .transform(_.sortByKey(false))


        // Print popular hashtags
        topCounts60.foreachRDD(rdd => {
          val topList = rdd.take(10)
          println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
          topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
        })

        topCounts10.foreachRDD(rdd => {
          val topList = rdd.take(10)
          println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
          topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
        })

        ssc.start()
        ssc.awaitTermination()
      }
    }

I am not able to understand fully the 2 code lines below:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)

Can someone please explain me these 2 lines?

Thanks and Regards,

Upvotes: 2

Views: 328

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37832

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

args is an Array; take(4) returns a sub-Array with the first (left-most) four elements. Assigning these four elements into Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) means that the val consumerKey will hold the value of the first element; consumerSecret will hold the value of the second, and so on. This is a neat Scala trick of "unpacking" an Array (can be done with other collections too) into named values.

val filters = args.takeRight(args.length - 4)

takeRight(n) returns a sub-Array from the right, meaning the last n elements in the array. Here, an Array with everything but the first four elements is assigned into a new value named filters.

Upvotes: 4

Related Questions