Reputation: 4532
versions:
according to documentation to commit offsets in kafka I should use:
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
As result offsets are committed only on start of next batch. That cause 'constant' lag.
Is there any workaround to commit offsets at the end of current batch (but still use the same kafka group for offsets)?
Upvotes: 3
Views: 1080
Reputation: 149538
Is there any workaround to commit offsets at the end of current batch
Not via the commitAsync
API. What the method call does is queue up the offsets to be committed, and then during DirectKafkaInputDStream.compute
does the asynchronous commit:
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
// Create KafkaRDD and other irrelevant code
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
Where commitAll
just polls the queue being filled up by commitAsync
:
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
var osr = commitQueue.poll()
while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
osr = commitQueue.poll()
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
}
}
Thus, unfortunately, if you want to commit the offsets as a transaction, you're going to have to store them separately in your own store and not use the built in offset commit tracking by Kafka.
Upvotes: 3