Reputation: 1233
The official Spark Streaming Programming Guide in DataFrame and SQL Operations section mentions about running SQL queries asynchronously:
You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext).
Are there any examples/samples that shows how to do it?
Upvotes: 3
Views: 262
Reputation: 74679
It's really interesting that the docs had to mention it since it is a fact that any temporary table is accessible for any thread using the same SparkSession
.
I would go about this as follows:
// Create a fixed thread pool to execute asynchronous tasks
val executorService = Executors.newFixedThreadPool(1)
dstream.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records = rdd.toDF("record")
records.createOrReplaceTempView("records")
// Submit a asynchronous task to execute a SQL query
executorService.submit {
new Runnable {
override def run(): Unit = {
sql("select * from records").show(truncate = false)
}
}
}
}
Upvotes: 3