Reputation: 18108
Puzzled on a piece of code I borrowed from the internet for research purposes. This is the code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
val spark = ...
val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val rddQueue = new mutable.Queue[RDD[Char]]()
val QS = ssc.queueStream(rddQueue)
QS.foreachRDD(q=> {
print("Hello") // Queue never exhausted
if(!q.isEmpty) {
... do something
... do something
}
}
)
//ssc.checkpoint("/chkpoint/dir") if unchecked causes Serialization error
ssc.start()
for (c <- 'a' to 'c') {
rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()
I was tracing through it just to check and noted that "hello" is being printed out forever:
HelloHelloHelloHelloHelloHelloHelloHelloHelloHello and so on
I would have thought the queueStream would exhaust after 3 iterations.
So, what have I missed?
Upvotes: 1
Views: 321
Reputation: 18108
Got it. It is actually exhausted, but the looping continues and that's why the statement
if(!q.isEmpty)
is there.
OK, would have thought it would just stop, or, rather not execute, but not so. I remember now. An empty RDD will result, if nothing streamed, based on timing of batch interval. Leaving for others as there was an upvote.
However, even though legacy, it is a bad example as adding checkpoint causes a Serialization error. Leaving it for the benefit of others.
ssc.checkpoint("/chkpoint/dir")
Upvotes: 0