Dawid
Dawid

Reputation: 682

Termination of Structured Streaming queue using Databricks

I would like to understand whether running a cell in a Databricks notebook with the code below and then cancelling it means that the stream reading is over. Or perhaps it does require some explicit closing?

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("subscribe", "topic1")
  .load()

display(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)])

Upvotes: 0

Views: 4380

Answers (1)

Ged
Ged

Reputation: 18098

Non-display Mode

It's best to issue this command in a cell:

streamingQuery.stop()

for this type of approach:

 val streamingQuery = streamingDF                // Start with our "streaming" DataFrame
  .writeStream                                  // Get the DataStreamWriter
  .queryName(myStreamName)                      // Name the query
  .trigger(Trigger.ProcessingTime("3 seconds")) // Configure for a 3-second micro-batch
  .format("parquet")                            // Specify the sink type, a Parquet file
  .option("checkpointLocation", checkpointPath) // Specify the location of checkpoint files & W-A logs
  .outputMode("append")                         // Write only new data to the "file"
  .start(outputPathDir)         

Otherwise it continues to run - which is the idea of streaming.

I would not stop the cluster as it is all Streams then.

Databricks display Mode

DataBricks have written a nice set of utilities, but you need to do the course to get them. My folly.

display is a databricks thing. Needs format like:

 display(myDF, streamName = "myQuery")

then proceed as follows in a separate cell:

println("Looking for %s".format(myStreamName))

for (stream <- spark.streams.active)       // Loop over all active streams
    if (stream.name == myStreamName)         // Single out your stream
       {val s = spark.streams.get(stream.id)
        s.stop()
       }

This will stop the display approach which is write to memory sink.

Upvotes: 5

Related Questions