Tushar Kolhe
Tushar Kolhe

Reputation: 9473

How to restart pyspark streaming query from checkpoint data?

I am creating a spark streaming application using pyspark 2.2.0

I am able to create a streaming query

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
      .builder \
      .appName("StreamingApp") \
      .getOrCreate()

staticDataFrame = spark.read.format("parquet")\
.option("inferSchema","true").load("processed/Nov18/")

staticSchema = staticDataFrame.schema
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("parquet")\
.load("processed/Nov18/")

daily_trs=streamingDataFrame.select("shift","date","time")
.groupBy("date","shift")\
.count("shift")

writer = df.writeStream\
   .format("parquet")\
   .option("path","data")\
   .option("checkpointLocation","data/checkpoints")\
   .queryName("streamingData")\
   .outputMode("append")

query = writer.start()
query.awaitTermination()

The query is streaming and any additional file to "processed/Nov18" will be processed and stored to "data/"

If the streaming fails I want to restart the same query

Path to solution

  1. According to official documentation I can get an id that can be used to restart the query https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html?highlight=streamingquery#pyspark.sql.streaming.StreamingQuery.id

  2. The pyspark.streaming module contains StreamingContext class that has classmethod

    classmethod getActiveOrCreate(checkpointPath, setupFunc) https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.getOrCreate

can these methods be used somehow?

If anyone has any use case of production ready streaming app for reference ?

Upvotes: 1

Views: 2369

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74759

You should simply (re)start the pyspark application with the checkpoint directory available and Spark Structured Streaming does the rest. No changes required.

If anyone has any use case of production ready streaming app for reference ?

I'd ask on the Spark users mailing list.

Upvotes: 1

Related Questions