rogue-one
rogue-one

Reputation: 11587

Spark Kafka Streaming making progress but there is no data to be consumed

I have a simple Spark Structured streaming job that uses Kafka 0.10 API to read data from Kafka and write to our S3 storage. From the logs I could see that for the each batch that is triggered the streaming application is making progress and is consuming data from source because that endOffset is greater than startOffset and both are always increasing for each batch. But the numInputRows is always zero and there are no rows written to the S3.

Why is there an progressive increase in offsets but no data is consumed by the spark batch?

19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
  "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
  "name" : null,
  "timestamp" : "2019-09-10T15:55:00.000Z",
  "batchId" : 189,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 127,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 24,
    "setOffsetRange" : 36,
    "triggerExecution" : 1859,
    "walCommit" : 1032
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
    "startOffset" : {
      "my_kafka_topic" : {
        "23" : 1206926686,
        "8" : 1158514946,
        "17" : 1258387219,
        "11" : 1263091642,
        "2" : 1226741128,
        "20" : 1229560889,
        "5" : 1170304913,
        "14" : 1207333901,
        "4" : 1274242728,
        "13" : 1336386658,
        "22" : 1260210993,
        "7" : 1288639296,
        "16" : 1247462229,
        "10" : 1093157103,
        "1" : 1219904858,
        "19" : 1116269615,
        "9" : 1238935018,
        "18" : 1069224544,
        "12" : 1256018541,
        "3" : 1251150202,
        "21" : 1256774117,
        "15" : 1170591375,
        "6" : 1185108169,
        "24" : 1202342095,
        "0" : 1165356330
      }
    },
    "endOffset" : {
      "my_kafka_topic" : {
        "23" : 1206928043,
        "8" : 1158516721,
        "17" : 1258389219,
        "11" : 1263093490,
        "2" : 1226743225,
        "20" : 1229562962,
        "5" : 1170307882,
        "14" : 1207335736,
        "4" : 1274245585,
        "13" : 1336388570,
        "22" : 1260213582,
        "7" : 1288641384,
        "16" : 1247464311,
        "10" : 1093159186,
        "1" : 1219906407,
        "19" : 1116271435,
        "9" : 1238936994,
        "18" : 1069226913,
        "12" : 1256020926,
        "3" : 1251152579,
        "21" : 1256776910,
        "15" : 1170593216,
        "6" : 1185110032,
        "24" : 1202344538,
        "0" : 1165358262
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
  }
}

A simplified version of the spark code is as shown below

  val df =  sparkSession
      .readStream
      .format"kafka")
      .options(Map(
      "kafka.bootstrap.servers" -> "host:1009",
      "subscribe" -> "my_kafka-topic",
      "kafka.client.id" -> "my-client-id",
      "maxOffsetsPerTrigger" -> 1000,
      "fetch.message.max.bytes" -> 6048576
    ))
      .load()


  df
    .writeStream
    .partitionBy("date", "hour")
    .outputMode(OutputMode.Append())
    .format("parquet")
    .options(Map("checkpointLocation" -> "checkpoint", "path" -> "data"))
    .trigger(Trigger.ProcessingTime(Duration("5m")))
    .start()
    .awaitTermination()

Edit: from the logs I also see these before each batch is executed


19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-21 to offset 1255403059.

Upvotes: 0

Views: 1849

Answers (2)

Lokesh
Lokesh

Reputation: 21

This exact issue with updating offsets but no input rows happened to me when I cleaned my checkpoint location to start the streaming afresh but used the old target location (not cleared) for writing the streamed data. After cleaning (changing) both checkpoint and write location it worked just fine.

In this particular case as I cleared the checkpoint location the offsets were getting updated properly. But because I didn't clear the target location (as it had data from 5-6 months of continuous streaming i.e. 100s of 1000s of small files to delete) but apparently spark checks for the spark metadata and as it found old data in there it didn't consume any new data.

Upvotes: 2

Dhaval Patel
Dhaval Patel

Reputation: 36

Can you check is any of the case related to output directory and checkpoint location mentioned in below link is applicable in your case?

https://kb.databricks.com/streaming/file-sink-streaming.html

Upvotes: 2

Related Questions