Reputation: 3181
I'm trying to create a stream of tweets in Spark using Scala and Twitter4j. This is a snippet of my code:
object auth{
val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("")
.setOAuthConsumerSecret("")
.setOAuthAccessToken("")
.setOAuthAccessTokenSecret("")
.build
}
val conf = new SparkConf().setMaster("local[2]").setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(1))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter = twitter_auth.getInstance(a).getAuthorization()
and when I try to call createstream:
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
I get this error:
[error] /home/shaza90/Desktop/streaming/scala/Tutorial.scala:30: overloaded method value createStream with alternatives:
[error] (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization,filters: Array[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
[error] (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
[error] cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.Authorization, Seq[String], org.apache.spark.storage.StorageLevel)
[error] val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
I don't know why it isn't matching the types and seeing my call as an overload, can you assist please? And when I try to replace atwitter (the authorization object) with None it compiles successfully !!
Upvotes: 2
Views: 2777
Reputation: 373
I think atwitter
must be Option[T] to disambiguate the call.
You can use:
val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
instead of
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
You can use: Some(atwitter)
in the call too... as said.
Here you have the test class for this api: https://github.com/apache/spark/blob/master/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
Upvotes: 4
Reputation: 13985
As you can see in the error log, the function signature is following,
createStream(
ssc: org.apache.spark.streaming.StreamingContext,
twitterAuth: Option[twitter4j.auth.Authorization],
filters: Seq[String],
storageLevel: org.apache.spark.storage.StorageLevel
): org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
You are trying to use it like this,
val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
Which means you are calling it with following signature,
createStream(
org.apache.spark.streaming.StreamingContext,
twitter4j.auth.Authorization,
Seq[String],
org.apache.spark.storage.StorageLevel
)
Noticed the difference ? it expects Option[ twitter4j.auth.Authorization ]
and you are providing it twitter4j.auth.Authorization
. So you need to wrap your twitter4j.auth.Authorization
with Option monad
by using Some
.
val tweets = TwitterUtils.createStream( ssc, Some( atwitter ), filters, DISK_ONLY_2 )
For reading more about option type, you can refer to Nepphtyte's guide to Scala by Daniel Westheide
-> http://danielwestheide.com/blog/2012/12/19/the-neophytes-guide-to-scala-part-5-the-option-type.html which can be said to be one of the best Scala resources for someone with basics of Scala.
Upvotes: 0