Reputation: 41
I am using Spark Structured streaming on Spark 2.2 to Stream Files from an HDFS directory to a Kafka Topic. I would like to capture the Kafka offsets for the data I am writing to the topic.
I am using
val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()
To write to Kafka.
When I utilize
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
to capture the stream's progress information the information retrieved does not correlate with the offsets being created in Kafka.
I assume that this is because the information provided by the stream is really about the file stream I am utilizing and not related to what is written in Kafka.
Is there a way with Spark Structure Streaming to capture the offset information that is being generated when we write to Kafka?
Adding Example:
When I run data in from source 1 with three rows after just creating the topic I get:
Run 1:
Start Offset: null, End offset: {"logOffset":0}
Start Offset: {"logOffset":0}, End offset: {"logOffset":0}
Kafka Says:
ruwe:2:1
ruwe:1:1
ruwe:0:1
Run 2;
Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
Start Offset: {"logOffset":1}, End offset: {"logOffset":1}
Kafka Says:
ruwe:2:2
ruwe:1:2
ruwe:0:2
Run 3:
Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
Start Offset: {"logOffset":2}, End offset: {"logOffset":2}
Kafka Says:
ruwe:2:3
ruwe:1:3
ruwe:0:3
I then ran data in with the same program from a different source and received
Start Offset: null, End offset: {"logOffset":0}
Start Offset: {"logOffset":0}, End offset: {"logOffset":0}
and of course Kafka continued to increment
This indicates that Spark is reporting information that is based on the source
I would like to know what was created in the target.
Upvotes: 3
Views: 1706
Reputation: 41
After reading the code for Spark Structure Streaming Specifically the Kafka KafkaWriter, KafkaWriteTask and CachedKafkaProducer, Spark does not consume the offsets that are returned from the KafkaProducer in the callback. The callback they define only captures exceptions. Based on this I would say the in the current release 2.2 it cannot be done.
What information they provide is all around the source of the query not the target.
Upvotes: 1
Reputation: 149628
Is there a way with Spark Structure Streaming to capture the offset information that is being generated when we write to Kafka?
Yes, in onQueryProgress
, you need to look at the StreamingQueryProgress.sources
which is an Array[SourceProgress]
. It has two strings, startOffset
and endOffset
, which are JSONs that you can parse:
sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ???
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val source = event.progress.sources.headOption
source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}"))
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = ()
})
The JSON has the following structure:
"startOffset" : {
"topic-name" : {
"0" : 1,
"1" : 22,
"2" : 419,
}
},
"endOffset" : {
"topic-name" : {
"0" : 10,
"1" : 100,
"2" : 1000
}
}
Upvotes: 1