jrip
jrip

Reputation: 140

How to make Spark streams execute sequentially

Issue

I have a job that executes two streams in total but I want the last one to start after the first stream has finished since the first stream saves events from the readstream in a DeltaTable that serve as input for the second stream. The problem is that what is being added in the first stream is not available in the second stream, in the current notebook run, because they start simultaneously.

Is there a way to enforce the order while running it from the same notebook?

I've tried the awaitTermination function but discovered this does not solve my problem. Some pseudocode:

def main():
    # Read eventhub
    metricbeat_df = spark \
        .readStream \
        .format("eventhubs") \
        .options(**eh_conf) \
        .load()

    # Save raw events
    metricbeat_df.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query1") \
        .table("my_db.raw_events")

    # Parse events
    metricbeat_df = spark.readStream \
        .format("delta") \
        .option("ignoreDeletes", True) \
        .table("my_db.raw_events")
  
    # *Do some transformations here*

    metricbeat_df.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query2") \
        .table("my_db.joined_bronze_events")

TLDR

To summarize the issue: when I run the code above, query1 and query2 start at the same time which causes that my_db.joined_bronze_events is a bit behind my_db.raw_events because what is being added in query1 is not available in query2 in the current run (it will be in the next run of course).

Is there a way to enforce that query2 will not start until query1 has finished while still running it in the same notebook?

Upvotes: 3

Views: 1257

Answers (1)

Michael Heil
Michael Heil

Reputation: 18525

As you are using the option Trigger.once, you can make use of the processAllAvailable method in your StreamingQuery:

def main():
    # Read eventhub
    # note that I have changed the variable name to metricbeat_df1
    metricbeat_df1 = spark \
        .readStream \
        .format("eventhubs") \
        .options(**eh_conf) \
        .load()

    # Save raw events
    metricbeat_df1.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query1") \
        .table("my_db.raw_events") \
        .processAllAvailable()

    # Parse events
    # note that I have changed the variable name to metricbeat_df2
    metricbeat_df2 = spark.readStream \
        .format("delta") \
        .option("ignoreDeletes", True) \
        .table("my_db.raw_events")
  
    # *Do some transformations here*

    metricbeat_df2.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query2") \
        .table("my_db.joined_bronze_events") \
        .processAllAvailable()

Note, that I have changed the dataframe names as they should not be the same for both streaming queries.

The method processAllAvailable is described as:

"Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a org.apache.spark.sql.execution.streaming.Source prior to invocation. (i.e. getOffset must immediately reflect the addition)."

Upvotes: 2

Related Questions