Hussein Joe
Hussein Joe

Reputation: 1

Spring kafka does not support Large message consumers

I am using spring Kafka to consume message produced by LinkedIn large message supported Kafka client

Given that this Kafka client always overrides AUTO_OFFSET_RESET_CONFIG to none as shown in its constructor.

private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
    Deserializer<K> keyDeserializer,
    Deserializer<V> valueDeserializer,
    Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
    Auditor<K, V> consumerAuditor) {
        _kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
        byteArrayDeserializer,
        byteArrayDeserializer);
    }
Map<String, Object> configForVanillaConsumer() {
    Map<String, Object> newConfigs = new HashMap<>();
    newConfigs.putAll(this.originals());
    newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    return newConfigs;
}

So Once I start using batch commit and setting the ENABLE_AUTO_COMMIT_CONFIG to false, it throws the following error:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener for group document-event-consumer failed on partition assignment org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: DocumentEvents-2 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)

This issue happens because it is the first time for this consumer group to consume messages from this topic, so it tries to use the offset reset policy.

Although I set it to "earliest", it got overridden to "none" by the underlying LinkedIn kafka client

I tried also to override ConsumerRebalanceListener to manually seek to the beginning in this case, but actually it does not come to this point.

How can I solve this issue?

Upvotes: 0

Views: 834

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

Interesting; please open an issue in GitHub.

We should catch that exception if the policy is none.

In the meantime, you might be able to work around it by using the regular client just once, to actually set an initial offset for the group (you don't have to actually have to receive any messages, just get the partitions assigned and set the initial position for the group).

Upvotes: 0

Related Questions