Reputation: 157
I'm using Kafka Processor API and I don't want only to use a time based approach to decide when to commit the processed messages, within the task, but I would do that based either on a number of processed messages or on timeout. Is there any way to implement that in Java?
Upvotes: 1
Views: 787
Reputation: 62310
Processor API allows you to "request" commits via ProcessorContext#commit()
. Calling this method, Kafka Streams will commit as soon as possible. This should allow you to implement some Processor
internal counter and call commit() base in this counter.
Additionally, you can either use there configures commit interval, or disable it effectively be setting it to Long.MAX_VALUE
.
You can also schedule punctuation and call commit() from there either based on event-time or wall-clock time to get the "timeout" behavior you want.
Upvotes: 1