Sampat Kumar
Sampat Kumar

Reputation: 502

Spark Streaming Twitter createStream Issue

I was trying to Stream data from Twitter Using Spark Streaming . But the
below issue.

import org.apache.spark.streaming.twitter._
import twitter4j.auth._
import twitter4j.conf._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
val cb = new ConfigurationBuildercb.setDebugEnabled(true).setOAuthConsumerKey("").setOAuthConsumerSecret("").setOAuthAccessToken    ("").setOAuthAccessTokenSecret("")
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc,auth)

ERROR SCREEN:

val tweets = TwitterUtils.createStream(ssc,auth)
<console>:49: error: overloaded method value createStream with alternatives:
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,filters: Array[String])org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
  (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
 cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.OAuthAuthorization)
       val tweets = TwitterUtils.createStream(ssc,auth)

Upvotes: 0

Views: 837

Answers (2)

Sampat Kumar
Sampat Kumar

Reputation: 502

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._


object TwitterStream {

def setupLogging() = {
import org.apache.log4j.{Level, Logger}   
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)   
}

/** Configures Twitter service credentials using twiter.txt in the main 
workspace directory */
def setupTwitter() = {
import scala.io.Source

for (line <- Source.fromFile("/Users/sampy/twitter.txt").getLines) {
  val fields = line.split(" ")
  if (fields.length == 2) {
    System.setProperty("twitter4j.oauth." + fields(0), fields(1))
  }
}
}

/** Our main function where the action happens */
def main(args: Array[String]) {

setupTwitter()


val ssc = new StreamingContext("local[*]", 
"PopularHashtags",Seconds(5))

setupLogging()

val tweets = TwitterUtils.createStream(ssc, None)
val engTweets = tweets.filter(x => x.getLang() == "en")

val statuses = engTweets.map(status => status.getText)

val tweetwords = statuses.flatMap(tweetText => tweetText.split(" ")) 

val hashtags = tweetwords.filter(word => word.startsWith("#"))

val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1)) // 


val hashtagCounts = 
hashtagKeyValues.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(5), 
Seconds(20))
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => 
x._2, false))
sortedResults.saveAsTextFiles("/Users/sampy/tweetsTwitter","txt")

sortedResults.print



ssc.checkpoint("/Users/sampy/checkpointTwitter")
ssc.start()
ssc.awaitTermination()
}  
}

Upvotes: 0

maasg
maasg

Reputation: 37435

The method in the question has this signature:

def createStream(
  ssc: StreamingContext,
  twitterAuth: Option[Authorization],
  filters: Seq[String] = Nil,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
)

We can see that ssc: StreamingContext and twitterAuth: Option[Authorization] are mandatory. The two other are optional.

In your case, the twitterAuth type is incorrect. It's an Option[Authorization]. The call, in this case, should look like this:

val tweets = TwitterUtils.createStream(ssc, Some(auth))

Upvotes: 2

Related Questions