Kane
Kane

Reputation: 101

scala.MatchError Message whenever I run Scala Object

The following piece of code is apart of a Twitter Streaming app that I'm using with Spark Streaming.:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)

// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

Whenever I go to run the program, I get the following error:

Exception in thread "main" scala.MatchError: [Ljava.lang.String;@323659f8 (of class [Ljava.lang.String;)
at SparkPopularHashTags$.main(SparkPopularHashTags.scala:18)
at SparkPopularHashTags.main(SparkPopularHashTags.scala)

Line 18 is:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

I have the Twitter4j.properties file saved in my F:\Software\ItelliJ\Projects\twitterStreamApp\src folder, and it's formatted like so:

oauth.consumerKey=***
oauth.consumerSecret=***
oauth.accessToken=***
oauth.accessTokenSecret=***

Where the "*"s are my keys without quotations around them (i.e. oauth.consumerKey=h12b31289fh7139fbh138ry)

Can anyone assist me with this please?

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

object SparkPopularHashTags {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
  val sc = new SparkContext(conf)

  def main(args: Array[String]) {

    sc.setLogLevel("WARN")

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
//    val filters = args.takeRight(args.length - 4)


    args.lift(0).foreach { consumerKey =>
      System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    }
    args.lift(1).foreach { consumerSecret =>
      System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    }
    args.lift(2).foreach { accessToken =>
      System.setProperty("twitter4j.oauth.accessToken", accessToken)
    }
    args.lift(3).foreach { accessTokenSecret =>
      System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    }

    val filters = args.drop(4)
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generate OAuth credentials
//    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
//    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
//    System.setProperty("twitter4j.oauth.accessToken", accessToken)
//    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))

    val stream = TwitterUtils.createStream(ssc, None, filters)

    // Split the stream on space and extract hashtags
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // print tweets in the correct DStream
    stream.print()

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

Upvotes: 0

Views: 892

Answers (2)

Tim
Tim

Reputation: 27356

This is the problem:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

This will fail if there are fewer than 4 arguments because it can't match the four values on the left hand side.

Instead, you need to test the elements of args individually to make sure they are present. For example

args.lift(0).foreach { consumerKey =>
  System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
}
args.lift(1).foreach { consumerSecret =>
  System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
}
args.lift(2).foreach { accessToken =>
  System.setProperty("twitter4j.oauth.accessToken", accessToken)
}
args.lift(3).foreach { accessTokenSecret =>
  System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
}

val filters = args.drop(4)

Upvotes: 1

QuickSilver
QuickSilver

Reputation: 4045

This should happen only when your not setting your Program arguments or setting insufficient no. of arguments i.e. less than 4

Upvotes: 0

Related Questions