Knows Not Much
Knows Not Much

Reputation: 31576

SparkException: Default partitioner cannot partition array keys

I wrote the following program using spark streaming

object TrendingHashTags {
  def main(args: Array[String]) : Unit = {
    val url = getClass.getResource("/twitterapi.properties")
    val source = Source.fromURL(url)
    val props = new Properties()
    props.load(source.bufferedReader())
    System.setProperty("twitter4j.oauth.consumerKey", props.get("consumer_key").toString)
    System.setProperty("twitter4j.oauth.consumerSecret", props.get("consumer_secret").toString)
    System.setProperty("twitter4j.oauth.accessToken", props.get("access_token").toString)
    System.setProperty("twitter4j.oauth.accessTokenSecret", props.get("access_token_secret").toString)

    val conf = new SparkConf().setAppName("Abhishek Spark Streaming")
    val ssc = new StreamingContext(conf, Seconds(30))
    ssc.checkpoint("checkpoint")
    val tweets = TwitterUtils.createStream(ssc, None)
    val tweetsByLang = tweets.filter(tweet => tweet.getLang == "en-US")
    val tweetText = tweetsByLang.map(t => t.getText)
    val words = tweetText.map(t => t.split("\\s+"))
    val hashTags = words.filter(w => w.startsWith("#")).map(h => (h, 1))
    val tagsWithCounts = hashTags.updateStateByKey{
      (counts : Seq[Int], prevCount : Option[Int]) => prevCount.map{c => c + counts.sum}.orElse{Some(counts.sum)}
    }
    val topHashTags = tagsWithCounts.filter{
      case (t, c) => c > 10
    }
    val sortedTopHashTags = topHashTags.transform{
      rdd => rdd.sortBy({
        case (w, c) => c
      }, false)
    }
    sortedTopHashTags.print(10)
    ssc.start()
    ssc.awaitTermination()
  }
}

But when I run this program using spark-submit spark-submit --class com.abhi.TrendingHashTags --master yarn Foo.jar

I get error

    16/05/12 04:18:00 ERROR scheduler.JobScheduler: Error generating jobs for time 1463026680000 ms
org.apache.spark.SparkException: Default partitioner cannot partition array keys.
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:82)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:82)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1.apply(PairRDDFunctions.scala:506)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1.apply(PairRDDFunctions.scala:499)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:499)
    at org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:105)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
    at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Does anyone know what is going wrong?

Upvotes: 0

Views: 531

Answers (1)

maasg
maasg

Reputation: 37435

The exception is clear, arrays are being used as keys in a key-based operation.

The code shows that this is not the intentions. Following the types of the DStreams, we can see where things go wrong:

val tweets: DStream[Status] = TwitterUtils.createStream(ssc, None)
val tweetsByLang: DStream[Status] = tweets.filter(tweet => tweet.getLang == "en-US")
val tweetText:DStream[String] = tweetsByLang.map(t => t.getText)
val words[Array[String]] = tweetText.map(t => t.split("\\s+")) // problem here

I guess that what we want is a DStream of words to proceed with the classic word count. Fix with:

val words[String] = tweetText.flatMap(t => t.split("\\s+")) // fixed

Upvotes: 2

Related Questions