girafi
girafi

Reputation: 130

Spring Kafka Consumer skips some offsets

We have a very simple Kafka Consumer (v 2.6.2). It is the only consumer in a consumer-group, and this group is the only one reading from a topic (with 6 partitions, and some 3 million events in it). Broker is also version 2.6.x

As we need to fulfil a "excatly-once" scenario, we took a deep look, if we really consume every event exactly once that is written to the topic. Unfortunately we found out that: The consumer sometimes skips an offset, and sometimes even a bunch of offsets of a partition.

The Consumer does not do much more than logging. An Error-Handler is configured with logging.

@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${topicname}")
public void onMessage(ConsumerRecord<String, BlaBlaEvent> record) throws Exception {
  // mostly logging
}

And it logs:

consuming Offset=30 Partition=2
consuming Offset=31 Partition=2
consuming Offset=32 Partition=2
consuming Offset=67 Partition=2

(did you see the Offset=67?)

Here's our application.properties

spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS_URL}
spring.kafka.properties.security.protocol: SSL
spring.kafka.properties.schema.registry.url=${SCHEMA_REGISTRY_URL}
spring.kafka.properties.specific.avro.reader=true

spring.kafka.ssl.trust-store-type=PKCS12
spring.kafka.ssl.key-store-type=PKCS12
spring.kafka.ssl.key-store-password=${KEYSTORE_PASSWORD}
spring.kafka.ssl.trust-store-password=${TRUSTSTORE_PASSWORD}
spring.kafka.ssl.trust-store-location=${TRUSTSTORE_PATH}
spring.kafka.ssl.key-store-location=${KEYSTORE_PATH}

spring.kafka.consumer.client-id=BlablaNotificationReader
spring.kafka.consumer.group-id=blabla-group
spring.kafka.consumer.auto-offset-reset=none
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

logging.level.org.apache.kafka=DEBUG

Sidenotes:

Does anyone have an idea how to find the root of this problem?

Upvotes: 1

Views: 4034

Answers (1)

Tomaz Fernandes
Tomaz Fernandes

Reputation: 2594

Since you're looking for exactly-once semantics, you probably have transaction-enabled producers - if so, it might be due to the partitions having transactional records.

The KafkaConsumer documentation states:

Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using read_committed consumers may also see gaps due to aborted transactions, since those messages would not be returned by the consumer and yet would have valid offsets.

The only way I see to actually miss a record would be if the consumer committed an offset for or after a record / records that didn't get processed for some reason.

If you want more information about the records being polled from the broker and the offsets being committed, you can set the logging level for the org.springframework.kafka.listener.KafkaMessageListenerContainer class to TRACE. That should give you a good insight on the records being received and the offsets being committed. If all offsets EDIT: before the one being committed have been processed, it's not likely you're missing any.

In order to check if the exactly-once semantics are correct, maybe you can try to correlate the logs from your producers to the logs from your consumers through some id, specially when this event happens.

You might also want to check the producers' logs from when that happens, since the gap might be due to aborted transactions.

Upvotes: 5

Related Questions