Reputation: 9473
I am creating a spark streaming application using pyspark 2.2.0
I am able to create a streaming query
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StreamingApp") \
.getOrCreate()
staticDataFrame = spark.read.format("parquet")\
.option("inferSchema","true").load("processed/Nov18/")
staticSchema = staticDataFrame.schema
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("parquet")\
.load("processed/Nov18/")
daily_trs=streamingDataFrame.select("shift","date","time")
.groupBy("date","shift")\
.count("shift")
writer = df.writeStream\
.format("parquet")\
.option("path","data")\
.option("checkpointLocation","data/checkpoints")\
.queryName("streamingData")\
.outputMode("append")
query = writer.start()
query.awaitTermination()
The query is streaming and any additional file to "processed/Nov18" will be processed and stored to "data/"
If the streaming fails I want to restart the same query
Path to solution
According to official documentation I can get an id that can be used to restart the query https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html?highlight=streamingquery#pyspark.sql.streaming.StreamingQuery.id
The pyspark.streaming module contains StreamingContext class that has classmethod
classmethod getActiveOrCreate(checkpointPath, setupFunc) https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.getOrCreate
can these methods be used somehow?
If anyone has any use case of production ready streaming app for reference ?
Upvotes: 1
Views: 2369
Reputation: 74759
You should simply (re)start the pyspark application with the checkpoint directory available and Spark Structured Streaming does the rest. No changes required.
If anyone has any use case of production ready streaming app for reference ?
I'd ask on the Spark users mailing list.
Upvotes: 1