Maciej
Maciej

Reputation: 153

Why Kafka KTable is missing entries?

I have a single instance java application that uses KTable from Kafka Streams. Until recently I could retrieve all data using KTable when suddenly some of the messages seemed to vanish. There should be ~33k messages with unique keys there.

When I want to retrieve messages by key I don't get some of the messages. I use ReadOnlyKeyValueStore to retrieve messages:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);

These are the configuration settings I set to the KafkaStreams.

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Kafka: 0.10.2.0-cp1
Confluent: 3.2.0

Investigations brought me to some very worrying insights. Using REST Proxy I manually read partitions and found out that some offsets return error.

Request: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{
    "error_code": 50002,
    "message": "Kafka error: Fetch response contains an error code: 1"
}

No client, neither java nor command line however return any error. They just skip over the faulty missing messages resulting in missing data in KTables. Everything was fine and without notice it seems that somehow some of the messages got corrupt.

I have two brokers and all the topics have the replication factor of 2 and are fully replicated. Both brokers separately return the same. Restarting brokers makes no difference.

Upvotes: 4

Views: 1360

Answers (1)

Maciej
Maciej

Reputation: 153

By default Kafka Broker config key cleanup.policy is set to delete. Set it to compact to keep the latest message for each key. See compaction.

Deletion of old messages does not change the minimum offset so trying to retrieve message below it causes an error. The error is very vague. The Kafka Streams client will start reading messages from minimum offset so there is no error. The only visible effect is missing data in KTables.

While the application is running thanks to the caches all data might still be available even after messages are deleted from Kafka itself. They will vanish after cleanup.

Upvotes: 1

Related Questions