Reputation: 951
tweetStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
var fileName = outputDirectory + "/tweets_" + time.milliseconds.toString
val outputRDD = rdd.repartition(partitionsEachInterval)
outputRDD.saveAsTextFile(fileName)
}
I am trying to check count value or empty RDD in stream data in python way , hardy finding ways, also tried examples from the below link. http://spark.apache.org/docs/latest/streaming-programming-guide.html
Upvotes: 3
Views: 10863
Reputation: 3235
You can simply use RDD.isEmpty
as user6910411 suggested:
df.rdd.isEmpty()
It returns boolean.
Upvotes: 1
Reputation: 9
Try using the following snippet of code.
def process_rdd(rdd):
print rdd.count()
print("$$$$$$$$$$$$$$$$$$$$$$")
streamrdd_to_df(rdd)
def empty_rdd():
print "###The current RDD is empty. Wait for the next complete RDD ###"
clean.foreachRDD(lambda rdd: empty_rdd() if rdd.count() == 0 else process_rdd(rdd))
Upvotes: 0
Reputation: 330393
Returns true if and only if the RDD contains no elements at all.
sc.range(0, 0).isEmpty()
True
sc.range(0, 1).isEmpty()
False
Upvotes: 5