Reputation: 95
Two Writestream
to the same database sink is not happening in sequence in Spark Structured Streaming 2.2.1. Please suggest how to make them execute in sequence.
val deleteSink = ds1.writestream
.outputMode("update")
.foreach(mydbsink)
.start()
val UpsertSink = ds2.writestream
.outputMode("update")
.foreach(mydbsink)
.start()
deleteSink.awaitTermination()
UpsertSink.awaitTermination()
Using the above code, deleteSink
is executed after UpsertSink
.
Upvotes: 5
Views: 4875
Reputation: 565
If you want to have two streams running in parallel, you have to use
sparkSession.streams.awaitAnyTermination()
instead of
deleteSink.awaitTermination()
UpsertSink.awaitTermination()
In your case UpsertSink will never start unless deleteSink will be stopped or an exception thrown, as it says in the scaladoc
Waits for the termination of
this
query, either byquery.stop()
or by an exception. If the query has terminated with an exception, then the exception will be thrown. If the query has terminated, then all subsequent calls to this method will either return immediately (if the query was terminated bystop()
), or throw the exception immediately (if the query has terminated with exception).
Upvotes: 11