Maurizio Cacace
Maurizio Cacace

Reputation: 73

Spark Streaming using Kafka: empty collection exception

I'm developing an algorithm using Kafka and Spark Streaming. This is part of my receiver:

val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(10))

 // Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val slice=30
val lines = messages.map(_._2)
val dStreamDst=lines.transform(rdd => {
    val y= rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
    rdd.map(x => (((x.split(",")(0).toInt - y.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
})
dStreamDst.print()

on which I get the following error :

 ERROR JobScheduler: Error generating jobs for time 1484927230000 ms
 java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)

What does it means? How could I solve it? Any kind of help is truly appreciated..thanks in advance

Update: Solved. Don't use transform or print() method. Use foreachRDD, is the best solution.

Upvotes: 1

Views: 447

Answers (1)

jeff
jeff

Reputation: 4333

You are encountering this b/c you are interacting with the DStream using the transform() API. When using that method, you are given the RDD that represents that snapshot of data in time, in your case the 10 second window. Your code is failing because at a particular time window, there was no data, and the RDD you are operating on is empty, giving you the "empty collection" error when you invoke reduce().

Use the rdd.isEmpty() to ensure that the RDD is not empty before invoking your operation.

lines.transform(rdd => {
  if (rdd.isEmpty)
    rdd
  else {
    // rest of transformation
  }
})

Upvotes: 1

Related Questions