Running multiple Spark Kafka Structured Streaming queries in same spark session increasing the offset but showing numInputRows 0

I have a Spark Structured Streaming consuming records from Kafka topic with 2 partition.

Spark Job: 2 queries, each consuming from 2 separate partition, running from same spark session.

    val df1 = session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrapServer)
            .option("assign", "{\"multi-stream1\" : [0]}")
            .option("startingOffsets", latest)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("max.poll.records", 500)
            .option("failOnDataLoss", true)
    val query1 = df1
            .select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir1)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
    val df2 = session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrapServer)
            .option("assign", "{\"multi-stream1\" : [1]}")
            .option("startingOffsets", latest)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("max.poll.records", 500)
            .option("failOnDataLoss", true)
val query2 ="key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir2)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/

Problem: every time the records are pushed in both the partition, both queries show progress, but only one of them is emitting the output. I can see the output from those query whose records are processed. For e.g., Kafka Partition 0 - records are pushed, spark will process the query1. Kafka Partition 1 - records are pushed when the query1 is busy processing, spark will show the start offset and end offset incremented, but numInputRows = 0 for query 2.

Running env: Local PC - Same problem. Dataproc cluster - spark-submit --packages

org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 --class org.DifferentPartitionSparkStreaming --master yarn --deploy-mode cluster --num-executors 2 --driver-memory 4g --executor-cores 4 --executor-memory 4g gs://dpl-ingestion-event/jars/stream_consumer-jar- with-dependencies.jar "{"multiple-streaming" : [0]}" latest "10.w.x.y:9092,10.r.s.t:9092,10.a.b.c:9092" "{"multiple-streaming" : [1]}" - Same problem.

Checkpoint and output path is Google Bucket.


20/07/24 19:37:27 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "e7d026f7-bf62-4a86-8697-a95a2fc893bb",
  "runId" : "21169889-6e4b-419d-b338-2d4d61999f5b",
  "name" : "reconcile",
  "timestamp" : "2020-07-24T14:06:55.002Z",
  "batchId" : 2,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 3549,
    "getBatch" : 0,
    "getEndOffset" : 1,
    "queryPlanning" : 32,
    "setOffsetRange" : 1,
    "triggerExecution" : 32618,
    "walCommit" : 15821
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Assign[multi-stream1-1]]",
    "startOffset" : {
      "multi-stream1" : {
        "1" : 240
    "endOffset" : {
      "multi-stream1" : {
        "1" : 250
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[gs://dpl-ingestion-event/demo/test/single-partition/data]"

I was able to resolve the problem. The root cause was that both the queries were trying to write to the same base path. Thus there was an overlap of the _spark_meta information. Spark Structured Streaming maintain checkpointing, as well as _spark_metadata file to keep track of the batch being processed.

Source Spark Doc:

In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based DataSource is initialized for reading, we first check for this log directory and use it instead of file listing when present.

Thus for now every query should be given a separate path. There is no option to configure the _spark_matadata location, unlike in checkpointing.

