Reputation: 73
I'm developing an algorithm using Kafka and Spark Streaming. This is part of my receiver:
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val slice=30
val lines = messages.map(_._2)
val dStreamDst=lines.transform(rdd => {
val y= rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
rdd.map(x => (((x.split(",")(0).toInt - y.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
})
dStreamDst.print()
on which I get the following error :
ERROR JobScheduler: Error generating jobs for time 1484927230000 ms
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
What does it means? How could I solve it? Any kind of help is truly appreciated..thanks in advance
Update: Solved. Don't use transform or print() method. Use foreachRDD, is the best solution.
Upvotes: 1
Views: 447
Reputation: 4333
You are encountering this b/c you are interacting with the DStream using the transform() API. When using that method, you are given the RDD that represents that snapshot of data in time, in your case the 10 second window. Your code is failing because at a particular time window, there was no data, and the RDD you are operating on is empty, giving you the "empty collection" error when you invoke reduce().
Use the rdd.isEmpty()
to ensure that the RDD is not empty before invoking your operation.
lines.transform(rdd => {
if (rdd.isEmpty)
rdd
else {
// rest of transformation
}
})
Upvotes: 1