Mayur
Mayur

Reputation: 124

Spark streaming: How to write cumulative output?

I have to write a single output file for my streaming job.

Question : when will my job actually stop? I killed the server but did not work. I want to stop my job from commandline(If it is possible)

Code:

    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.dstream
    import org.apache.spark.streaming.Duration
    import org.apache.spark.streaming.Seconds
    import org.apache.spark._
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import scala.collection.mutable.ArrayBuffer



    object MAYUR_BELDAR_PROGRAM5_V1 {

    def main(args: Array[String]) {

        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(10))

        val lines = ssc.socketTextStream("localhost", args(0).toInt)
        val words = lines.flatMap(_.split(" "))

        val class1 = words.filter(a => a.charAt(0).toInt%2==0).map(a => a).filter(a => a.length%2==0)
        val class2 = words.filter(a => a.charAt(0).toInt%2==0).map(a => a).filter(a => a.length%2==1)
    val class3 = words.filter(a => a.charAt(0).toInt%2==1).map(a => a).filter(a => a.length%2==0)
    val class4 = words.filter(a => a.charAt(0).toInt%2==1).map(a => a).filter(a => a.length%2==1)

    class1.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class1","txt")
    class2.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class2", "txt")
    class3.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class3","txt")
    class4.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class4","txt")

    ssc.start()             // Start the computation
    ssc.awaitTermination()
    ssc.stop()

    }
    }

Upvotes: 0

Views: 288

Answers (1)

Constantino Cronemberger
Constantino Cronemberger

Reputation: 2261

A stream by definition does not have an end so it will not stop unless you call the method to stop it. In my case I have a business condition that tell when the process is finished, so when I reach this point I'm calling the method JavaStreamingContext.close(). I also have a monitor that checks if the process has not received any data in the past few minutes in which case it will also close the stream.

In order to accumulate data you have to use the method updateStateByKey (on a PairDStream). This method requires checkpointing to be enabled.

I have checked the Spark code and found that saveAsTextFiles uses foreachRDD, so at the end it will save each RDD separately, so previous RDDs will not be taken into account. Using updateStateByKey it will still save multiple files, but each file will consider all RDDs that were processed before.

Upvotes: 1

Related Questions