shivali
shivali

Reputation: 457

Why is a streaming query still up and running after StreamingQueryManager.awaitAnyTermination?

I want to terminate the spark mapping after a specific time. I'm using sqlContext.streams.awaitAnyTermination(long timeoutMs) for that. But the mapping is not stopping after the given timeout.

I have tried to read from azure event hub and provided 2 min (120000 ms) as a timeout for awaitAnyTermination method. but the mapping is not stopping on azure databricks cluster.

Below is my code. I'm reading from azure eventhub and writing to console and 120000ms to awaitAnyTermination.

import org.apache.spark.eventhubs._

// Event hub configurations
// Replace values below with yours
import org.apache.spark.eventhubs.ConnectionStringBuilder

val connStr = ConnectionStringBuilder()
      .setNamespaceName("iisqaeventhub")
      .setEventHubName("devsource")
      .setSasKeyName("RootManageSharedAccessKey")
      .setSasKey("saskey")
      .build

val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5).setStartingPosition(EventPosition.fromEndOfStream)

// reading from the Azure event hub
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

// write to console
val query = incomingStream.writeStream
      .outputMode("append")
      .format("console")
      .start()

// awaitAnyTermination for shutting down the query
sqlContext.streams.awaitAnyTermination(120000)

I am expecting that mapping should have ended after a timeout. No error but mapping is not stopping.

Upvotes: 0

Views: 2253

Answers (2)

Ged
Ged

Reputation: 18013

When using DataBricks and prototyping, this is what I use to stop Spark Structured Streaming Apps in a separate Notebook pane:

import org.apache.spark.streaming._
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

Upvotes: 0

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

tl;dr Works as designed.


From the official documentation:

awaitAnyTermination(timeoutMs: Long): Boolean

Returns whether any query has terminated or not (multiple may have terminated).

In other words, no streaming query is going to be terminated at any point in time (before or after the timeoutMs) unless there is an exception or stop.

Upvotes: 2

Related Questions