N_C
N_C

Reputation: 992

How to start multiple streaming queries in a single Spark application?

I have built few Spark Structured Streaming queries to run on EMR, they are long running queries, and need to run at all times, since they are all ETL type queries, when I submit a job to YARN cluster on EMR, I can submit a single spark application. So that spark application should have multiple streaming queries.

I am confused on how to build/start multiple streaming queries within same submit programmatically.

For ex: I have this code:

case class SparkJobs(prop: Properties) extends Serializable {
  def run() = {
      Type1SparkJobBuilder(prop).build().awaitTermination()
      Type1SparkJobBuilder(prop).build().awaitTermination()
  }
}

I fire this in my main class with SparkJobs(new Properties()).run()

When I see in the spark history server, only the first spark streaming job (Type1SparkJob) is running.

What is the recommended way to fire multiple streaming queries within same spark submit programatically, I could not find proper documentation either.

Upvotes: 19

Views: 10908

Answers (2)

Silvio
Silvio

Reputation: 4197

Since you're calling awaitTermination on the first query it's going to block until it completes before starting the second query. So you want to kick off both queries, but then use StreamingQueryManager.awaitAnyTermination.

val query1 = df.writeStream.start()
val query2 = df.writeStream.start()

spark.streams.awaitAnyTermination()

In addition to the above, by default Spark uses the FIFO scheduler. Which means the first query gets all resources in the cluster while it's executing. Since you're trying to run multiple queries concurrently you should switch to the FAIR scheduler

If you have some queries that should have more resources than the others then you can also tune the individual scheduler pools.

Upvotes: 43

dunlu_98k
dunlu_98k

Reputation: 309

val query1=ds.writeSteam.{...}.start()

val query2=ds.writeSteam.{...}.start()

val query3=ds.writeSteam.{...}.start()

query3.awaitTermination()

AwaitTermination() will block your process until finish, which will never happen in a streaming app, call it on your last query should fix your problem

Upvotes: -5

Related Questions