Reputation: 1322
I am experimenting running structured streaming from kafka source and sinking them back to kafka topics.
In my current set up, I am scheduling two spark jobs via spark-submit.
Each job reads from it's own unique Kafka topic. But both of them write to a shared topic.
My current spark-defaults.conf includes:
spark.streaming.concurrentJobs 5
spark.scheduler.mode FAIR
When both jobs are scheduled independently, they work as expected. However, when I try to schedule them together, by submitting one after the other, the job submitted first stops responding with logs:
java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 10
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)
Are there some confs that I am missing? How do we schedule concurrent jobs writing to the same Kafka topic in Spark? Appreciate your thoughts.
Edit: writing to the same Kafka topic
Edit: Formatted Question Title
Upvotes: 2
Views: 5037
Reputation: 811
I had the same error while trying to write to two Kafka sinks and here is how I solved it. I'm not sure if this applies to your case, since in my case I was using Checkpointing for Failure Recovery.
Using the same checkpoint file for the two sinks is what caused the problem. So if you are using checkpoints, this might be the solution...
Here is the code (using Python) that produced the error in my case:
output_query_1 = aggDF_1.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("checkpointLocation", "/tmp/checkpoint") \
.option("topic", TOPIC_1) \
.outputMode("append") \
.start()
output_query_2 = aggDF_2.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("checkpointLocation", "/tmp/checkpoint") \
.option("topic", TOPIC_2) \
.outputMode("append") \
.start()
Passing different checkpoint files as checkpointLocation
solved it for me:
output_query_1 = aggDF_1.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("checkpointLocation", "/tmp/checkpoint_1") \
.option("topic", TOPIC_1) \
.outputMode("append") \
.start()
output_query_2 = aggDF_2.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("checkpointLocation", "/tmp/checkpoint_2") \
.option("topic", TOPIC_2) \
.outputMode("append") \
.start()
Upvotes: 3