irrelevantUser
irrelevantUser

Reputation: 1322

Spark Kafka Structured Streaming: Issue - Concurrent update to the log. Multiple streaming jobs detected

I am experimenting running structured streaming from kafka source and sinking them back to kafka topics.

In my current set up, I am scheduling two spark jobs via spark-submit.

Each job reads from it's own unique Kafka topic. But both of them write to a shared topic.

My current spark-defaults.conf includes:

spark.streaming.concurrentJobs 5
spark.scheduler.mode FAIR

When both jobs are scheduled independently, they work as expected. However, when I try to schedule them together, by submitting one after the other, the job submitted first stops responding with logs:

java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 10
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)

Are there some confs that I am missing? How do we schedule concurrent jobs writing to the same Kafka topic in Spark? Appreciate your thoughts.

Edit: writing to the same Kafka topic

Edit: Formatted Question Title

Upvotes: 2

Views: 5037

Answers (1)

bpirvu
bpirvu

Reputation: 811

I had the same error while trying to write to two Kafka sinks and here is how I solved it. I'm not sure if this applies to your case, since in my case I was using Checkpointing for Failure Recovery.

Using the same checkpoint file for the two sinks is what caused the problem. So if you are using checkpoints, this might be the solution...

Here is the code (using Python) that produced the error in my case:

output_query_1 = aggDF_1.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .option("topic", TOPIC_1) \
    .outputMode("append") \
    .start()

output_query_2 = aggDF_2.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .option("topic", TOPIC_2) \
    .outputMode("append") \
    .start()

Passing different checkpoint files as checkpointLocation solved it for me:

output_query_1 = aggDF_1.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint_1") \
    .option("topic", TOPIC_1) \
    .outputMode("append") \
    .start()

output_query_2 = aggDF_2.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint_2") \
    .option("topic", TOPIC_2) \
    .outputMode("append") \
    .start()

Upvotes: 3

Related Questions