Reputation: 1813
the code for testing below:
object StreamTest {
def main(args: Array[String]) {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
val lines = ssc.socketTextStream("192.168.11.5", 9999, StorageLevel.MEMORY_ONLY_SER)
val accStream = lines.map((_ , "")).updateStateByKey(updateFunc)
accStream.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}
When I send one data(only one) by NetCat, see screenshot:
the result is:
My question is: why the result is printed all the time? Why not one time? (I only send one data to socket client.)
I'm testing again(set spark streaming interval time to 5 seconds):
send data:
the result is:
Using ConstantInputDStream
for tesing, code below:
object SparkStreaming {
def main(args: Array[String]) {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
val seq = Seq("key") //every 1 second send a "key"
val rdd = ssc.sparkContext.parallelize(seq)
//using ConstantInputDStream as inputDStream
val inputDStream = new ConstantInputDStream(ssc, rdd)
val map = inputDStream.map((_, "")).updateStateByKey(updateFunc)
map.print
ssc.start
ssc.awaitTermination
}
def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}
the result is:
the result of 3rd testing is the same to result of 1st testing.
In 1st test, I only send a "key" in the 1st second.
In 3rd test, ConstantInputDStream send "key" every 1 second.
but why the results is the same? So the result is really odd with using socketTextStream
.
Could you tell me why? Thank you very much!
Upvotes: 0
Views: 1409
Reputation: 754
The whole point of updateStateByKey
is to save and accumulate state when needed. After updateStateByKey
your stream is set of tuples with keys and returned values from your update function. It will keep state of the key until you return None
from your update function instead of Some
.
You can refer to the example implementation in this answer: How to process a subset of input records in a batch, i.e. the first second in 3-sec batch time?
Upvotes: 1
Reputation: 1420
That is because of updateStateByKey
Saves your state and does not update unless new feeds are processed.
Upvotes: 1