Reputation: 140
I have a job that executes two streams in total but I want the last one to start after the first stream has finished since the first stream saves events from the readstream in a DeltaTable that serve as input for the second stream. The problem is that what is being added in the first stream is not available in the second stream, in the current notebook run, because they start simultaneously.
Is there a way to enforce the order while running it from the same notebook?
I've tried the awaitTermination
function but discovered this does not solve my problem. Some pseudocode:
def main():
# Read eventhub
metricbeat_df = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events")
# Parse events
metricbeat_df = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events")
To summarize the issue: when I run the code above, query1
and query2
start at the same time which causes that my_db.joined_bronze_events
is a bit behind my_db.raw_events
because what is being added in query1 is not available in query2 in the current run (it will be in the next run of course).
Is there a way to enforce that query2
will not start until query1
has finished while still running it in the same notebook?
Upvotes: 3
Views: 1257
Reputation: 18525
As you are using the option Trigger.once
, you can make use of the processAllAvailable
method in your StreamingQuery
:
def main():
# Read eventhub
# note that I have changed the variable name to metricbeat_df1
metricbeat_df1 = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df1.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events") \
.processAllAvailable()
# Parse events
# note that I have changed the variable name to metricbeat_df2
metricbeat_df2 = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df2.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events") \
.processAllAvailable()
Note, that I have changed the dataframe names as they should not be the same for both streaming queries.
The method processAllAvailable is described as:
"Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a org.apache.spark.sql.execution.streaming.Source prior to invocation. (i.e. getOffset must immediately reflect the addition)."
Upvotes: 2