Reputation: 4288
I am trying to run SQL queries over streaming data in spark. This looks pretty straight forward but when I try it, I get the error table not found : tablename>. It unable to find the table I've registered.
Using Spark SQL with batch data works fine so I'm thinking it has to do with how I'm calling streamingcontext.start(). Any ideas what is the issue? Here is the code:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
object Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
// lines.foreachRDD(rdd=>rdd.foreach(println))
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
ssc.start()
ssc.awaitTermination()
}
}
Any suggestions welcome. Thanks.
Upvotes: 6
Views: 4866
Reputation: 4288
Well I got to know the problem. You have to query the data within the foreachRDD function, otherwise the table is not recognized. Something like this works:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
object Mlist {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD(rdd=>{
rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}
Upvotes: 12