Reputation: 485
I am new to Spark so please guide.
There are lots of example available related to Spark streaming using Scala.
You could check it out from https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples.
I want to run TwitterPopularTags.scala.
I am not able to set the twitter login details for this example.
I am successfully run the network count example.
But when i execute
./run-example org.apache.spark.streaming.examples.TwitterPopularTags local[2]
then it will show me authentication failure issue...
I set twitter login details before initializing string context in TwitterPopularTags.scala like
System.setProperty("twitter4j.oauth.consumerKey", "####");
System.setProperty("twitter4j.oauth.consumerSecret", "##");
System.setProperty("twitter4j.oauth.accessToken", "##");
System.setProperty("twitter4j.oauth.accessTokenSecret", "##");
Please guide.
Upvotes: 2
Views: 6201
Reputation: 51
I was not able to open the github link https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples.
However you could use the below code which worked for me.
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
/**
* A Spark Streaming application that receives tweets on certain
* keywords from twitter datasource and find the popular hashtags
*
* Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
* <comsumerKey> - Twitter consumer key
* <consumerSecret> - Twitter consumer secret
* <accessToken> - Twitter access token
* <accessTokenSecret> - Twitter access token secret
* <keyword_1> - The keyword to filter tweets
* <keyword_n> - Any number of keywords to filter tweets
*
* More discussion at stdatalabs.blogspot.com
*
* @author Sachin Thirumala
*/
object SparkPopularHashTags {
val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
sc.setLogLevel("WARN")
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
// Set the Spark StreamingContext to create a DStream for every 5 seconds
val ssc = new StreamingContext(sc, Seconds(5))
// Pass the filter keywords as arguements
// val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
val stream = TwitterUtils.createStream(ssc, None, filters)
// Split the stream on space and extract hashtags
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
// Get the top hashtags over the previous 60 sec window
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Get the top hashtags over the previous 10 sec window
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// print tweets in the currect DStream
stream.print()
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
ssc.start()
ssc.awaitTermination()
}
}
Explanation:
setMaster("local[4]")
- Make sure to set master to local mode with at least 2 threads as 1 thread is used for collecting the incoming streams and another thread for processing it.
We count the popular hashtags with the below code:
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
The above snippet does a word count of the hashtags over the previous 60/10 secs as specified in the reduceByKeyAndWindow
and sorts them in descending order.
reduceByKeyAndWindow
is used in case we have to apply transformations on data that is accumulated in the previous stream intervals.
Execute the code by passing the four twitter OAuth tokens as arguments:
You should see the popular hashtags over every 10/60 second interval.
You may check similar projects by integrating spark streaming and storm with flume and kafka in the below links:
Spark Streaming:
Spark Streaming part 1: Real time twitter sentiment analysis http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-1-real-time.html
Spark streaming part 2: Real time twitter sentiment analysis using Flume http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-2-real-time_10.html
Spark streaming part 3: Real time twitter sentiment analysis using kafka http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html
Data guarantees in Spark Streaming with kafka integration http://stdatalabs.blogspot.in/2016/10/data-guarantees-in-spark-streaming-with.html
Storm:
Realtime stream processing using Apache Storm - Part 1 http://stdatalabs.blogspot.in/2016/09/realtime-stream-processing-using-apache.html
Realtime stream processing using Apache Storm and Kafka - Part 2 http://stdatalabs.blogspot.in/2016/10/real-time-stream-processing-using.html
Upvotes: 1
Reputation: 78
Put the file "twitter4j.properties" into the Spark root directory (e.g. spark-0.8.0-incubating) before you run the Twitter examples.
twitter4j.properties:
oauth.consumerKey=***
oauth.consumerSecret=***
oauth.accessToken=***
oauth.accessTokenSecret=***
Worked for me on Mac with the Scala examples.
Upvotes: 2