SRuwe
SRuwe

Reputation: 41

Capturing Kafka offsets when writing with Spark Structured Streaming

I am using Spark Structured streaming on Spark 2.2 to Stream Files from an HDFS directory to a Kafka Topic. I would like to capture the Kafka offsets for the data I am writing to the topic.

I am using

val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()

To write to Kafka.

When I utilize

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
    println("Query started: " + queryStarted.id) 
  }
  override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
    println("Query terminated: " + queryTerminated.id)
  }
  override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
    println("Query made progress: " + queryProgress.progress)
  }
})

to capture the stream's progress information the information retrieved does not correlate with the offsets being created in Kafka.

I assume that this is because the information provided by the stream is really about the file stream I am utilizing and not related to what is written in Kafka.

Is there a way with Spark Structure Streaming to capture the offset information that is being generated when we write to Kafka?

Adding Example: When I run data in from source 1 with three rows after just creating the topic I get:
Run 1: Start Offset: null, End offset: {"logOffset":0} Start Offset: {"logOffset":0}, End offset: {"logOffset":0}

 Kafka Says:
 ruwe:2:1
 ruwe:1:1
 ruwe:0:1

Run 2;

  Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
  Start Offset: {"logOffset":1}, End offset: {"logOffset":1}

 Kafka Says:
 ruwe:2:2
 ruwe:1:2
 ruwe:0:2

Run 3:

  Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
  Start Offset: {"logOffset":2}, End offset: {"logOffset":2}

 Kafka Says:
 ruwe:2:3
 ruwe:1:3
 ruwe:0:3

I then ran data in with the same program from a different source and received

  Start Offset: null, End offset: {"logOffset":0}
  Start Offset: {"logOffset":0}, End offset: {"logOffset":0}

  and of course Kafka continued to increment

This indicates that Spark is reporting information that is based on the source

I would like to know what was created in the target.

Upvotes: 3

Views: 1706

Answers (2)

SRuwe
SRuwe

Reputation: 41

After reading the code for Spark Structure Streaming Specifically the Kafka KafkaWriter, KafkaWriteTask and CachedKafkaProducer, Spark does not consume the offsets that are returned from the KafkaProducer in the callback. The callback they define only captures exceptions. Based on this I would say the in the current release 2.2 it cannot be done.

What information they provide is all around the source of the query not the target.

Upvotes: 1

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149628

Is there a way with Spark Structure Streaming to capture the offset information that is being generated when we write to Kafka?

Yes, in onQueryProgress, you need to look at the StreamingQueryProgress.sources which is an Array[SourceProgress]. It has two strings, startOffset and endOffset, which are JSONs that you can parse:

sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ???

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val source = event.progress.sources.headOption
    source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}"))
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = ()
})

The JSON has the following structure:

"startOffset" : {
  "topic-name" : {
    "0" : 1,
    "1" : 22,
    "2" : 419,
  }
},
"endOffset" : {
  "topic-name" : {
    "0" : 10,
    "1" : 100,
    "2" : 1000
  }
}

Upvotes: 1

Related Questions