Ged
Ged

Reputation: 18108

Spark QueueStream never exhausted

Puzzled on a piece of code I borrowed from the internet for research purposes. This is the code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

val spark = ... 

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[Char]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q=> {
   print("Hello") // Queue never exhausted
   if(!q.isEmpty) {
       ... do something
       ... do something
   }
}
)

//ssc.checkpoint("/chkpoint/dir") if unchecked causes Serialization error

ssc.start()
for (c <- 'a' to 'c') {
    rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()

I was tracing through it just to check and noted that "hello" is being printed out forever:

 HelloHelloHelloHelloHelloHelloHelloHelloHelloHello and so on

I would have thought the queueStream would exhaust after 3 iterations.

So, what have I missed?

Upvotes: 1

Views: 321

Answers (1)

Ged
Ged

Reputation: 18108

Got it. It is actually exhausted, but the looping continues and that's why the statement

 if(!q.isEmpty)

is there.

OK, would have thought it would just stop, or, rather not execute, but not so. I remember now. An empty RDD will result, if nothing streamed, based on timing of batch interval. Leaving for others as there was an upvote.

However, even though legacy, it is a bad example as adding checkpoint causes a Serialization error. Leaving it for the benefit of others.

ssc.checkpoint("/chkpoint/dir")

Upvotes: 0

Related Questions