Dmitry B
Dmitry B

Reputation: 41

Spark Structured Streaming, multiples queries are not running concurrently

I slightly modified example taken from here - https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala

I added seconds writeStream (sink):

scala
case class MyWriter1() extends ForeachWriter[Row]{
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
    println(s"custom1 - ${value.get(0)}")
  }

  override def close(errorOrNull: Throwable): Unit = true
}

case class MyWriter2() extends ForeachWriter[(String, Int)]{
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: (String, Int)): Unit = {
    println(s"custom2 - $value")
  }

  override def close(errorOrNull: Throwable): Unit = true
}


object Main extends Serializable{

  def main(args: Array[String]): Unit = {
    println("starting")

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val host = "localhost"
    val port = "9999"

    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("app-test")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query1 = wordCounts.writeStream
      .outputMode("update")
      .foreach(MyWriter1())
      .start()

    val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count")))

    val query2 = ds.writeStream
      .outputMode("update")
      .foreach(MyWriter2())
      .start()

    spark.streams.awaitAnyTermination()

  }
}

Unfortunately, only first query runs, second never runs (MyWriter2 never been called)

Please advice what I'm doing wrong. According to doc: You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources.

Upvotes: 4

Views: 5082

Answers (4)

Sheel Pancholi
Sheel Pancholi

Reputation: 693

What you have done is right! Just go ahead and check the scheduler your Spark f/w is using. Most probably it would be using FIFO scheduler which means the first query takes up all resources. Just change it to FAIR Scheduler and you should be good.

Upvotes: 0

Aydin K.
Aydin K.

Reputation: 3367

I had the same situation (but on the newer structured-streaming api) and in my case it helped to call awaitTermination() on the last streamingQuery.

s.th. like:

query1.start()
query2.start().awaitTermination()

Update: Instead above, this build-in solution/method is better:

sparkSession.streams.awaitAnyTermination()

Upvotes: 5

oaksharks
oaksharks

Reputation: 43

Are you using nc -lk 9999 for sending data to spark ? every query create a connection to nc but nc can only send data to the first connection (query) , you can write a tcp server instead of nc

Upvotes: 2

raam86
raam86

Reputation: 6871

You are using .awaitAnyTermination() which will terminate the application when the first stream returns, you have to wait for both of the streams to finish before you terminate.

something like this should do the trick:

 query1.awaitTermination()
 query2.awaitTermination()

Upvotes: 1

Related Questions