Reputation: 1013
We need to use maxOffsetsPerTrigger
in the Kafka source with Trigger.Once()
in structured streaming but based on this issue it seems reads allAvailable
in spark 3. Is there a way for achieving rate limit in this situation?
Here is a sample code in spark 3:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
) ++
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
Upvotes: 1
Views: 3447
Reputation: 2692
With Spark 3.3.0 you should be able to use Trigger.AvailableNow as discussed in Bartosz's excellent blog: https://www.waitingforcode.com/apache-spark-structured-streaming/what-new-apache-spark-3.3.0-structured-streaming/read
I say should because as I'm playing around with it I haven't quite got it to work, and will be posting a question around this shortly.
Upvotes: 0
Reputation: 41
Here is how we "solved" this. This is basically the approach mike
wrote about in the accepted answer.
In our case, the size of the message varied very little and we therefore knew how much time the processing of a batch takes. So in a nutshell we:
Trigger.Once()
with Trigger.ProcessingTime(<ms>)
since maxOffsetsPerTrigger
works with this modeawaitTermination(<ms>)
to mimic Trigger.Once()
val kafkaOptions = Map[String, String](
"kafka.bootstrap.servers" -> "localhost:9092",
"failOnDataLoss" -> "false",
"subscribePattern" -> "testTopic",
"startingOffsets" -> "earliest",
"maxOffsetsPerTrigger" -> "10", // "batch" size
)
val streamWriterOptions = Map[String, String](
"checkpointLocation" -> "path/to/checkpoints",
)
val processingInterval = 30000L
val terminationInterval = 15000L
sparkSession
.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.writeStream
.options(streamWriterOptions)
.format("Console")
.trigger(Trigger.ProcessingTime(processingInterval))
.start()
.awaitTermination(terminationInterval)
This works because the first batch will be read and processed respecting the maxOffsetsPerTrigger
limit. Say, in 10 seconds. The second batch is then started to be processed but it is terminated in the middle of the operation after ~5s and never reaches the set 30s mark. But it stores the offsets correctly. picks up and processes this "killed" batch in the next run.
A downside of this approach is that you have to approximately know how much time does it take to process one "batch" - if you set the terminationInterval
too low the job's output will constantly be nothing.
Of course, if you don't care about the exact number of batches you process in one run, you can easily adjust the processingInterval
to be times smaller than the terminationInterval
. In that case, you may process a varying number of batches in one go but still respecting the value of maxOffsetsPerTrigger
.
Upvotes: 3
Reputation: 18475
There is no other way around it to properly set a rate limit. If the maxOffsetsPerTrigger
is not applicable for streaming jobs with the Once
trigger you could do the following to achieve identical result:
Choose another trigger and use maxOffsetsPerTrigger
to limit the rate and kill this job manually after it finished processing all data.
Use options startingOffsets
and endingOffsets
while making the job a batch job. Repeat until you have processed all data within the topic. However, there is a reason why "Streaming in RunOnce mode is better than Batch" as detailed here.
Last option would be to look into the linked pull request and compile Spark on your own.
Upvotes: 4