Aditya
Aditya

Reputation: 21

Kafka Consumer API jumping offsets

I am using Kafka Version 2.0 and java consumer API to consume messages from a topic. We are using a single node Kafka server with one consumer per partition. I have observed that the consumer is loosing some of the messages. The scenario is: Consumer polls the topic. I have created One Consumer Per Thread. Fetches the messages and gives it to a handler to handle the message. Then it commits the offsets using "At-least-once" Kafka Consumer semantics to commit Kafka offset. In parallel, I have another consumer running with a different group-id. In this consumer, I'm simply increasing the message counter and committing the offset. There's no message loss in this consumer.

try {
    //kafkaConsumer.registerTopic();

    consumerThread = new Thread(() -> {
        final String topicName1 = "topic-0";
        final String topicName2 = "topic-1";
        final String topicName3 = "topic-2";
        final String topicName4 = "topic-3";

        String groupId = "group-0";
        final Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.49:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

        try {
            consumer = new KafkaConsumer<>(consumerProperties);
            consumer.subscribe(Arrays.asList(topicName1, topicName2, topicName3, topicName4));
        } catch (KafkaException ke) {
            logTrace(MODULE, ke);
        }
        while (service.isServiceStateRunning()) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, byte[]>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, byte[]> record : partitionRecords) {
                    processMessage(simpleMessage);

                }
            }
            consumer.commitSync();
        }
        kafkaConsumer.closeResource();
    }, "KAKFA_CONSUMER");

} catch (Exception e) {
}

Upvotes: 2

Views: 1941

Answers (2)

Antony Stubbs
Antony Stubbs

Reputation: 13585

You probably shouldn't do what you're doing. You should use subscribe, and use multiple partitions per topic, and multiple consumers in the group for high availability, and allow the consumer to handle the offsets for you.

You don't describe why you're trying to process your topics in this custom way? It's advanced and leads to issues.

The timestamps on your instances should not have to be synchronised to do normal topic processing.

If you're looking for more performance or to isolate records more carefully to avoid "head of line blocking" consider something like Parallel Consumer (PC).

It also tracks per record acknowledgement, among other things. Check out Parallel Consumer on GitHub (it's open source BTW, and I'm the author).

Upvotes: 0

Manoj Vadehra
Manoj Vadehra

Reputation: 846

There seems to be a problem with usage of subscribe() here.

Subscribe is used to subscribe to topics and not to partitions. To use specific partitions you need to use assign(). Read up the extract from the documentation:

public void subscribe(java.util.Collection topics)

Subscribe to the given list of topics to get dynamically assigned partitions. Topic subscriptions are not incremental. This list will replace the current assignment (if there is one). It is not possible to combine topic subscription with group management with manual partition assignment through assign(Collection). If the given list of topics is empty, it is treated the same as unsubscribe(). This is a short-hand for subscribe(Collection, ConsumerRebalanceListener), which uses a noop listener. If you need the ability to seek to particular offsets, you should prefer subscribe(Collection, ConsumerRebalanceListener), since group rebalances will cause partition offsets to be reset. You should also provide your own listener if you are doing your own offset management since the listener gives you an opportunity to commit offsets before a rebalance finishes.


public void assign(java.util.Collection partitions)

Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment and will replace the previous assignment (if there is one). If the given list of topic partitions is empty, it is treated the same as unsubscribe(). Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with assign(Collection) and group assignment with subscribe(Collection, ConsumerRebalanceListener).

Upvotes: 2

Related Questions