Reputation: 41
I slightly modified example taken from here - https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
I added seconds writeStream (sink):
scala
case class MyWriter1() extends ForeachWriter[Row]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
println(s"custom1 - ${value.get(0)}")
}
override def close(errorOrNull: Throwable): Unit = true
}
case class MyWriter2() extends ForeachWriter[(String, Int)]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: (String, Int)): Unit = {
println(s"custom2 - $value")
}
override def close(errorOrNull: Throwable): Unit = true
}
object Main extends Serializable{
def main(args: Array[String]): Unit = {
println("starting")
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.master("local[*]")
.appName("app-test")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query1 = wordCounts.writeStream
.outputMode("update")
.foreach(MyWriter1())
.start()
val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count")))
val query2 = ds.writeStream
.outputMode("update")
.foreach(MyWriter2())
.start()
spark.streams.awaitAnyTermination()
}
}
Unfortunately, only first query runs, second never runs (MyWriter2 never been called)
Please advice what I'm doing wrong. According to doc: You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources.
Upvotes: 4
Views: 5082
Reputation: 693
What you have done is right! Just go ahead and check the scheduler your Spark f/w is using. Most probably it would be using FIFO scheduler which means the first query takes up all resources. Just change it to FAIR Scheduler and you should be good.
Upvotes: 0
Reputation: 3367
I had the same situation (but on the newer structured-streaming api) and in my case it helped to call awaitTermination() on the last streamingQuery.
s.th. like:
query1.start()
query2.start().awaitTermination()
Update: Instead above, this build-in solution/method is better:
sparkSession.streams.awaitAnyTermination()
Upvotes: 5
Reputation: 43
Are you using nc -lk 9999
for sending data to spark ? every query create a connection to nc
but nc
can only send data to the first connection (query) , you can write a tcp server instead of nc
Upvotes: 2
Reputation: 6871
You are using .awaitAnyTermination()
which will terminate the application when the first stream returns, you have to wait for both of the streams to finish before you terminate.
something like this should do the trick:
query1.awaitTermination()
query2.awaitTermination()
Upvotes: 1