Reputation: 61
I'm trying to create a dStream from a kafka server and then do some transformations on that stream. I have included a catch for if the stream is empty (if(!rdd.partitions.isEmpty)
); however, even when no events are being published to the kafka topic, the else
statement is never reached.
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
Is there an alternative statement I should use to check if the stream is empty when using KafkaUtils.createDirectStream
rather than createStream
?
Upvotes: 3
Views: 1092
Reputation: 149618
Use RDD.isEmpty
instead of RDD.partitions.isEmpty
which adds a check to see if the underlying partition actually has elements:
stream.foreachRDD { rdd =>
if(!rdd.isEmpty) {
// Stuff
}
}
The reason RDD.partitions.isEmpty
isn't working is that there exists a partition inside the RDD
, but that partition itself is empty. But from the view of partitions
which is an Array[Partition]
, it isn't empty.
Upvotes: 4