user1457821
user1457821

Reputation: 33

how to check if rdd is empty using spark streaming?

I have following pyspark code which I am using to read log files from logs/ directory and then saving results to a text file only when it has the data in it ... in other words when RDD is not empty. But I am having issues implementing it. I have tried both take(1) and notempty. As this is dstream rdd we can't apply rdd methods to it. Please let me know if I am missing anything.

conf = SparkConf().setMaster("local").setAppName("PysparkStreaming")
sc = SparkContext.getOrCreate(conf = conf)

ssc = StreamingContext(sc, 3)   #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('/Users/rocket/Downloads/logs/')  #'logs/ mean directory name
audit = lines.map(lambda x: x.split('|')[3])
result = audit.countByValue()
#result.pprint()
#result.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
# Print the first ten elements of each RDD generated in this DStream to the console
if result.foreachRDD(lambda rdd: rdd.take(1)):
    result.pprint()
    result.saveAsTextFiles("/Users/rocket/Downloads/output","txt")
else:
    result.pprint()
    print("empty")

Upvotes: 1

Views: 2055

Answers (1)

user11124576
user11124576

Reputation: 26

The correct structure would be

import uuid 

def process_batch(rdd):
    if not rdd.isEmpty():
        result.saveAsTextFiles("/Users/rocket/Downloads/output-{}".format(
          str(uuid.uuid4())
        ) ,"txt")


result.foreachRDD(process_batch)

That however, as you see above, requires a separate directory for each batch, as RDD API doesn't have append mode.

And alternative could be:

def process_batch(rdd):
    if not rdd.isEmpty():
       lines = rdd.map(str)
       spark.createDataFrame(lines, "string").save.mode("append").format("text").save("/Users/rocket/Downloads/output")

Upvotes: 1

Related Questions