Reputation: 5870
I'm using Spark Steaming to consume data from Kafka with the code snippet like :
rdd.foreachRdd{rdd=>rdd.foreachPartition{...}}
I'm using foreachPartition because I need to create connection with Hbase, I don't wanna open/close connection by each record. But I found that when there is no data in Kafka, spark streaming is still processing foreachRdd and foreachPartition. This caused many Hbase connections were created even though there were no any data were consumed. I really don't like this, how should I make Spark stop doing this when there is no data was consumed from Kafka please.
Upvotes: 1
Views: 393
Reputation: 2234
Simply check that there are items in the RDD. So your code could be:
rdd.foreachRdd{rdd=> if(rdd.isEmpty == false) rdd.foreachPartition{...}}
Upvotes: 3