Flamma
Flamma

Reputation: 298

How do I test that an offset has been committed or not to Kafka

I have an Akka Stream Kafka source that is reading from a Kafka topic.

I have a simple task that is allowing disabling commit of the message offset. The commit is usually done calling commitScaladsl.

My problem is I don't know how to test if the offset has been committed or not.

We usually use EmbeddedKafka for testing, but I haven't figured out a way of asking for the last committed offset.

This is an example of the test I have written:

  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

Now I want to add a check for the offset being committed or not.

Upvotes: 2

Views: 4666

Answers (3)

Viktor Podzigun
Viktor Podzigun

Reputation: 59

While using ConsumerInterceptor works well, I would also suggest more general approach, in case its not possible to use interceptor from previous answer (in integration or end to end tests, for example).

The idea is to poll current offsets for consumer group:

def assertConsumedFromKafka(record: RecordMetadata): Assertion = {
  record.hasOffset shouldBe true

  val topicPartition = new TopicPartition(record.topic, record.partition)
  val groupSpec      = new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singletonList(topicPartition))
  val groupId = "YOUR-CONSUMER-GROUP-ID-HERE"
  val offsets = kafkaAdmin
    .listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec))
    .partitionsToOffsetAndMetadata(groupId)
    .get()
  val offset = offsets.get(topicPartition)

  assert(offset != null, s"No offset for $topicPartition, offsets: ${offsets.asScala}")

  // using +1 here to make sure that offset was committed (moved to the next message)
  assert(offset.offset() >= (record.offset() + 1), "Offset wasn't committed yet")
}

Please, note the (record.offset() + 1) in the last assert, it is crucial to compare using +1 to make sure the current message was processed and its offset was committed (moved to the next message).

Upvotes: 0

Flamma
Flamma

Reputation: 298

I finally solved it using a ConsumerInterceptor, defined as:

class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
  override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records

  override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
    import scala.collection.JavaConverters._
    OffsetRecorder.add(offsets.asScala)
  }

  override def close(): Unit = {}

  override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear

}

onCommit is called when the commit is done, in this case I just record it. I use configure method to have empty records at the start of each test.

Then, when creating the consumer settings for the source, I add the interceptor as a property:

  ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(s"localhost:${kafkaPort}")
    .withGroupId("group-id")
    .withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")

Upvotes: 2

Nishu Tayal
Nishu Tayal

Reputation: 20810

Kafka stores the offsets by the TopicName and PartitionID. So you can use .committed() or .position method to check the last committed offset or current position of Kafka consumer.

committed() : Get the last committed offset for the given partition (whether the commit happened by this process or another).

position() : Get the offset of the next record that will be fetched (if a record with that offset exists).

Upvotes: 1

Related Questions