Saket
Saket

Reputation: 3129

How do I stop a spark streaming job?

I have a Spark Streaming job which has been running continuously. How do I stop the job gracefully? I have read the usual recommendations of attaching a shutdown hook in the job monitoring and sending a SIGTERM to the job.

sys.ShutdownHookThread {
  logger.info("Gracefully stopping Application...")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  logger.info("Application stopped gracefully")
}

It seems to work but does not look like the cleanest way to stop the job. Am I missing something here?

From a code perspective it may make sense but how do you use this in a cluster environment? If we start a spark streaming job (we distribute the jobs on all the nodes in the cluster) we will have to keep track of the PID for the job and the node on which it was running. Finally when we have to stop the process, we need to keep track which node the job was running at and the PID for that. I was just hoping that there would be a simpler way of job control for streaming jobs.

Upvotes: 33

Views: 27089

Answers (5)

Marco Roy
Marco Roy

Reputation: 5295

This answer doesn't work if your stream is already running, but otherwise:

You could use the "Available-now micro-batch" trigger. It only processes one batch at a time, and you can do whatever you want in between batches: sleep, shut down, vacuum (preferably with an inventory table), process the next batch immediately, etc.

This gives you full programatic control over the orchestration of your stream. You can stop it during the night and on weekends, etc.

Upvotes: 0

Mageswaran
Mageswaran

Reputation: 450

Depends on the use case and how driver can be used.

Consider the case you wanted to collect some N records(tweets) from the Spark Structured Streaming, store them in Postgresql and stop the stream once the count crosses N records.

One way of doing this is to use accumulator and python threading.

  • Create a Python thread with stream query object and the accumulator, stop the query once the count is crossed
  • While starting the stream query pass the accumulator variable and update the value for each batch of the stream.

Sharing the code snippet for understanding/illustration purpose...

import threading
import time


def check_n_stop_streaming(query, acc, num_records=3500):
    while (True):
        if acc.value > num_records:
            print_info(f"Number of records received so far {acc.value}")
            query.stop()
            break
        else:
            print_info(f"Number of records received so far {acc.value}")
        time.sleep(1)
...

count_acc = spark.sparkContext.accumulator(0)

...

def postgresql_all_tweets_data_dump(df,
                                    epoch_id,
                                    raw_tweet_table_name,
                                    count_acc):

    print_info("Raw  Tweets...")
    df.select(["text"]).show(50, False)
    count_acc += df.count()

    mode = "append"
    url = "jdbc:postgresql://{}:{}/{}".format(self._postgresql_host,
                                              self._postgresql_port,
                                              self._postgresql_database)
    properties = {"user": self._postgresql_user,
                  "password": self._postgresql_password,
                  "driver": "org.postgresql.Driver"}
    df.write.jdbc(url=url, table=raw_tweet_table_name, mode=mode, properties=properties)

...

query = tweet_stream.writeStream.outputMode("append"). \
    foreachBatch(lambda df, id :
                 postgresql_all_tweets_data_dump(df=df,
                                                 epoch_id=id,
                                                 raw_tweet_table_name=raw_tweet_table_name,
                                                 count_acc=count_acc)).start()





stop_thread = threading.Thread(target=self.check_n_stop_streaming, args=(query, num_records, raw_tweet_table_name, ))
stop_thread.setDaemon(True)
stop_thread.start()

query.awaitTermination()
stop_thread.join()

Upvotes: 5

Vladimir Kroz
Vladimir Kroz

Reputation: 5367

If all you need is just stop running streaming application, then simplest way is via Spark admin UI (you can find it's URL in the startup logs of Spark master).

There is a section in the UI, that shows running streaming applications, and there are tiny (kill) url buttons near each application ID.

Upvotes: -2

Dave
Dave

Reputation: 139

It is official now,please look into original apache documentation here- http://spark.apache.org/docs/latest/configuration.html#spark-streaming

Upvotes: -4

ud3sh
ud3sh

Reputation: 1299

You can stop your streaming context in cluster mode by running the following command without needing to sending a SIGTERM. This will stop the streaming context without you needing to explicitly stop it using a thread hook.

$SPARK_HOME_DIR/bin/spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID

-$MASTER_REST_URL is the rest url of the spark driver, ie something like spark://localhost:6066

-$DRIVER_ID is something like driver-20150915145601-0000

If you want spark to stop your app gracefully, you can try setting the following system property when your spark app is initially submitted (see http://spark.apache.org/docs/latest/submitting-applications.html on setting spark configuration properties).

spark.streaming.stopGracefullyOnShutdown=true

This is not officially documented, and I gathered this from looking at the 1.4 source code. This flag is honored in standalone mode. I haven't tested it in clustered mode yet.

I am working with spark 1.4.*

Upvotes: 25

Related Questions