Natalia
Natalia

Reputation: 4532

Spark: commit kafka offsets on end of batch

versions:

according to documentation to commit offsets in kafka I should use:

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

As result offsets are committed only on start of next batch. That cause 'constant' lag.

Is there any workaround to commit offsets at the end of current batch (but still use the same kafka group for offsets)?

example of lag monitoring: enter image description here

Upvotes: 3

Views: 1080

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

Is there any workaround to commit offsets at the end of current batch

Not via the commitAsync API. What the method call does is queue up the offsets to be committed, and then during DirectKafkaInputDStream.compute does the asynchronous commit:

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
  val untilOffsets = clamp(latestOffsets())

  // Create KafkaRDD and other irrelevant code

  currentOffsets = untilOffsets
  commitAll()
  Some(rdd)
}

Where commitAll just polls the queue being filled up by commitAsync:

protected def commitAll(): Unit = {
  val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
  var osr = commitQueue.poll()
  while (null != osr) {
    val tp = osr.topicPartition
    val x = m.get(tp)
    val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
    m.put(tp, new OffsetAndMetadata(offset))
    osr = commitQueue.poll()
  }
  if (!m.isEmpty) {
    consumer.commitAsync(m, commitCallback.get)
  }
}

Thus, unfortunately, if you want to commit the offsets as a transaction, you're going to have to store them separately in your own store and not use the built in offset commit tracking by Kafka.

Upvotes: 3

Related Questions