Reputation: 1
I am using spring Kafka to consume message produced by LinkedIn large message supported Kafka client
Given that this Kafka client always overrides AUTO_OFFSET_RESET_CONFIG
to none as shown in its constructor.
private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
Auditor<K, V> consumerAuditor) {
_kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
byteArrayDeserializer,
byteArrayDeserializer);
}
Map<String, Object> configForVanillaConsumer() {
Map<String, Object> newConfigs = new HashMap<>();
newConfigs.putAll(this.originals());
newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return newConfigs;
}
So Once I start using batch commit and setting the ENABLE_AUTO_COMMIT_CONFIG
to false, it throws the following error:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener for group document-event-consumer failed on partition assignment org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: DocumentEvents-2 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)
This issue happens because it is the first time for this consumer group to consume messages from this topic, so it tries to use the offset reset policy.
Although I set it to "earliest", it got overridden to "none" by the underlying LinkedIn kafka client
I tried also to override ConsumerRebalanceListener to manually seek to the beginning in this case, but actually it does not come to this point.
How can I solve this issue?
Upvotes: 0
Views: 834
Reputation: 174799
Interesting; please open an issue in GitHub.
We should catch that exception if the policy is none
.
In the meantime, you might be able to work around it by using the regular client just once, to actually set an initial offset for the group (you don't have to actually have to receive any messages, just get the partitions assigned and set the initial position for the group).
Upvotes: 0