Guo
Guo

Reputation: 1813

How to use updateStateByKey() with using socketTextStream in spark streaming?

1st testing:

the code for testing below:

object StreamTest {

    def main(args: Array[String]) {
        val sc = new SparkContext
        val ssc = new StreamingContext(sc, Seconds(1))
        ssc.checkpoint("./checkpoint")

        val lines = ssc.socketTextStream("192.168.11.5", 9999, StorageLevel.MEMORY_ONLY_SER)
        val accStream = lines.map((_ , "")).updateStateByKey(updateFunc)
        accStream.print()
        ssc.start()
        ssc.awaitTermination()
    }
    def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}

When I send one data(only one) by NetCat, see screenshot: enter image description here

the result is:

enter image description here

My question is: why the result is printed all the time? Why not one time? (I only send one data to socket client.)

2nd testing:

I'm testing again(set spark streaming interval time to 5 seconds):

send data:

enter image description here

the result is:

enter image description here

3rd tesing:

Using ConstantInputDStream for tesing, code below:

object SparkStreaming {
    def main(args: Array[String]) {
        val sc = new SparkContext
        val ssc = new StreamingContext(sc, Seconds(1))
        ssc.checkpoint("./checkpoint")
        val seq = Seq("key")   //every 1 second send a "key"
        val rdd = ssc.sparkContext.parallelize(seq)
        //using ConstantInputDStream as inputDStream
        val inputDStream = new ConstantInputDStream(ssc, rdd)

        val map = inputDStream.map((_, "")).updateStateByKey(updateFunc)
        map.print
        ssc.start
        ssc.awaitTermination
    }

    def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}

the result is:

enter image description here

the result of 3rd testing is the same to result of 1st testing.

In 1st test, I only send a "key" in the 1st second.

In 3rd test, ConstantInputDStream send "key" every 1 second.

but why the results is the same? So the result is really odd with using socketTextStream.

Could you tell me why? Thank you very much!

Upvotes: 0

Views: 1409

Answers (2)

Alex Larikov
Alex Larikov

Reputation: 754

The whole point of updateStateByKey is to save and accumulate state when needed. After updateStateByKey your stream is set of tuples with keys and returned values from your update function. It will keep state of the key until you return None from your update function instead of Some.

You can refer to the example implementation in this answer: How to process a subset of input records in a batch, i.e. the first second in 3-sec batch time?

Upvotes: 1

Sumit
Sumit

Reputation: 1420

That is because of updateStateByKey Saves your state and does not update unless new feeds are processed.

Upvotes: 1

Related Questions