Reputation: 395
I am trying to write a Spark Structured Streaming job that reads from a Kafka topic and writes to separate paths (after performing some transformations) via the writeStream
operation. However, when I run the following code, only the first writeStream
gets executed and the second is getting ignored.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start() \
.awaitTermination()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start() \
.awaitTermination()
I initially thought that my issue was related to this post, however, after changing my code to the following:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start()
write_one.awaitTermination()
write_two.awaitTermination()
I received the following error:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
I am not sure why the additional code between start()
and awaitTermination()
would cause the error above (but I think this is probably a separate issue that is referenced in this answer to the same post above). What is the correct way to call multiple writeStream
operations within the same job? Would it be best to have both of the writes within the function that is invoked by foreachBatch
or is there are a better way to achieve this?
Upvotes: 2
Views: 2159
Reputation: 517
Spark documentation says that in case you need perform writing into multiple locations you need to use foreachBatch
method.
Your code should look something like:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
Note: persist
in needed in order to prevent recomputations.
You can check more: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
Upvotes: 5
Reputation: 2108
You just don't call awaiTermination()
for each of your stream queries, but just one through spark session, eg:
spark.streams.awaitAnyTermination()
Upvotes: 2