Reputation: 810
I have the following Spark streaming example:
val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
println(file.count())
})
ssc.start()
ssc.awaitTermination()
Even when the folder is empty it keeps printing 0 every 2 seconds like if an empty file was in the folder. I would like it to only go into the foreachRDD
when a new file is present in the folder. Is there something I'm doing wrong?
I am using Spark 1.6 and Scala 2.10.7.
Upvotes: 1
Views: 844
Reputation: 1175
As your batch duration is 2 seconds the job will trigger for each and every 2 seconds, basically the triggering point is not the data availability it is batch duration, if the data present at the time the DStream contains the data otherwise it will be empty ( use below code to check for the same)
dstream.foreachRDD{ rdd => if (!rdd.isEmpty) {// do something } }
Upvotes: 5