Reputation: 298
I have an Akka Stream Kafka source that is reading from a Kafka topic.
I have a simple task that is allowing disabling commit of the message offset. The commit is usually done calling commitScaladsl.
My problem is I don't know how to test if the offset has been committed or not.
We usually use EmbeddedKafka for testing, but I haven't figured out a way of asking for the last committed offset.
This is an example of the test I have written:
"KafkaSource" should {
"consume from a kafka topic and pass the message " in {
val commitToKafka = true
val key = "key".getBytes
val message = "message".getBytes
withRunningKafka {
val source = getKafkaSource(commitToKafka)
val (_, sub) = source
.toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
.run()
val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
messageOpt should not be empty
messageOpt.get.value shouldBe message
}
}
Now I want to add a check for the offset being committed or not.
Upvotes: 2
Views: 4666
Reputation: 59
While using ConsumerInterceptor
works well, I would also suggest more general approach, in case its not possible to use interceptor from previous answer (in integration or end to end tests, for example).
The idea is to poll current offsets for consumer group:
def assertConsumedFromKafka(record: RecordMetadata): Assertion = {
record.hasOffset shouldBe true
val topicPartition = new TopicPartition(record.topic, record.partition)
val groupSpec = new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singletonList(topicPartition))
val groupId = "YOUR-CONSUMER-GROUP-ID-HERE"
val offsets = kafkaAdmin
.listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec))
.partitionsToOffsetAndMetadata(groupId)
.get()
val offset = offsets.get(topicPartition)
assert(offset != null, s"No offset for $topicPartition, offsets: ${offsets.asScala}")
// using +1 here to make sure that offset was committed (moved to the next message)
assert(offset.offset() >= (record.offset() + 1), "Offset wasn't committed yet")
}
Please, note the (record.offset() + 1)
in the last assert, it is crucial to compare using +1 to make sure the current message was processed and its offset was committed (moved to the next message).
Upvotes: 0
Reputation: 298
I finally solved it using a ConsumerInterceptor, defined as:
class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records
override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
import scala.collection.JavaConverters._
OffsetRecorder.add(offsets.asScala)
}
override def close(): Unit = {}
override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear
}
onCommit is called when the commit is done, in this case I just record it. I use configure method to have empty records at the start of each test.
Then, when creating the consumer settings for the source, I add the interceptor as a property:
ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(s"localhost:${kafkaPort}")
.withGroupId("group-id")
.withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")
Upvotes: 2
Reputation: 20810
Kafka stores the offsets by the TopicName and PartitionID. So you can use .committed()
or .position
method to check the last committed offset or current position of Kafka consumer.
committed() : Get the last committed offset for the given partition (whether the commit happened by this process or another).
position() : Get the offset of the next record that will be fetched (if a record with that offset exists).
Upvotes: 1