Pravesh Jain
Pravesh Jain

Reputation: 4288

Trying to run SparkSQL over Spark Streaming

I am trying to run SQL queries over streaming data in spark. This looks pretty straight forward but when I try it, I get the error table not found : tablename>. It unable to find the table I've registered.

Using Spark SQL with batch data works fine so I'm thinking it has to do with how I'm calling streamingcontext.start(). Any ideas what is the issue? Here is the code:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext

object Streaming {

def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
    lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
    // lines.foreachRDD(rdd=>rdd.foreach(println))
    val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
    ssc.start()
    ssc.awaitTermination()
  }
}

Any suggestions welcome. Thanks.

Upvotes: 6

Views: 4866

Answers (1)

Pravesh Jain
Pravesh Jain

Reputation: 4288

Well I got to know the problem. You have to query the data within the foreachRDD function, otherwise the table is not recognized. Something like this works:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object Mlist {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

Upvotes: 12

Related Questions