Dorian Pavetić
Dorian Pavetić

Reputation: 115

Spring Boot Kafka - Avro SerializationException infinite loop

I have Spring Boot consumer that consumes Avro messages and I have configured it as retry topic with DLT. When SerializationException occur, it causes infinite loop and it produces huge amount of logs.. As I have read, SerializationExceptions should not be retried, so I am puzzled why this happens.

How could I fix it, and prevent it from filling up logs? And how to properly handle that serialization exception and maybe store those messages in persistance layer or something?

This is my config:

@EnableKafka
@Configuration
@AllArgsConstructor
public class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.listener.concurrency:1}")
    private final Integer concurrency;

    @Value("${spring.kafka.listener.enable-observation:true}")
    private final Boolean enableObservation;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<SpecificRecord, SpecificRecord> kafkaListenerContainerFactory(
            ConsumerFactory<SpecificRecord, SpecificRecord> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<SpecificRecord, SpecificRecord> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setObservationEnabled(enableObservation);

        return factory;
    }
}
@EnableKafka
@Configuration
@AllArgsConstructor
public class KafkaProducerConfig {

    @Value("${spring.kafka.template.enable-observation:true}")
    private final Boolean enableObservation;

    @Value("${spring.kafka.template.default-topic}")
    private final String defaultTopic;

    @Bean
    public KafkaTemplate<SpecificRecord, SpecificRecord> kafkaTemplate(
            ProducerFactory<SpecificRecord, SpecificRecord> producerFactory
    ) {
        KafkaTemplate<SpecificRecord, SpecificRecord> kafkaTemplate =
                new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setObservationEnabled(enableObservation);
        kafkaTemplate.setDefaultTopic(defaultTopic);
        return kafkaTemplate;
    }
}
@Component
@RetryableTopic(
        backoff = @Backoff(
                delayExpression = "${merchant.kafka.notification.backoff-delay}"
        ),
        attempts = "${merchant.kafka.notification.topic.max-retry-attempts}",
        dltStrategy = DltStrategy.FAIL_ON_ERROR,
        numPartitions = "${merchant.kafka.notification.retry-topic.partitions}",
        replicationFactor = "${merchant.kafka.notification.retry-topic.replication-factor}",
        concurrency = "${merchant.kafka.notification.topic.concurrency}"
)
@KafkaListener(
        topics = "${merchant.kafka.notification.topic.name}",
        containerFactory = "kafkaListenerContainerFactory",
        concurrency = "${merchant.kafka.notification.topic.concurrency}"
)
public class KafkaNotificationConsumer {

    @KafkaHandler
    public void handleNotification(
            @NonNull @Payload SomeAvroObject data,
            @NonNull @Headers KafkaMessageHeaders headers
    ) {
         // .. Process
    }

    @DltHandler
    public void processMessage(@NonNull SomeAvroObject data) {
        log.error("Could not process ");
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        log.error("Unknown object type was received on consumer: {}", object);
    }

}
spring:
  kafka:
    listener:
      concurrency: 2
      observation-enabled: true
    bootstrap-servers: localhost:9092,localhost:9082,localhost:9072
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      auto-offset-reset: earliest
      properties:
        spring.deserializer.key.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
        specific.avro.reader: true
    producer:
      client-id: ${merchant.kafka.appName}
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        auto.register.schemas: false
        use.latest.version: true
    properties:
      schema.registry.url: http://localhost:8072
    template:
      default-topic: ${merchant.kafka.notification.topic.name}
      observation-enabled: true

Log:

2025-02-20T15:51:48.708+01:00 ERROR 21780 --- [ntainer#1-0-C-1] [67b714a4e4cacabf56651fb7ea287fae-ac97847a6a6d9cb7] k.r.DeadLetterPublishingRecovererFactory : Record: topic = some-topic-name, partition = 1, offset = 0, main topic = some-topic-name threw an error at topic some-topic-name and won't be retried. Sending to DLT with name some-topic-name-dlt.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2873) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2921) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2773) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize
    at org.springframework.kafka.support.serializer.SerializationUtils.deserializationException(SerializationUtils.java:158) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:218) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:693) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.7.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1625) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1600) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1405) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 4 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 12
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:542) ~[kafka-avro-serializer-7.6.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:188) ~[kafka-avro-serializer-7.6.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:108) ~[kafka-avro-serializer-7.6.0.jar:na]
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:215) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 17 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Invalid index: 13
    at hr.kafka.model.SomeObject.put(SomeObject.java:179) ~[classes/:na]
    at org.apache.avro.generic.GenericData.setField(GenericData.java:851) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.3.jar:1.11.3]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:510) ~[kafka-avro-serializer-7.6.0.jar:na]
    ... 20 common frames omitted







2025-02-20T15:51:49.324+01:00 ERROR 21780 --- [ntainer#1-0-C-1] [67b714a4e4cacabf56651fb7ea287fae-ac97847a6a6d9cb7] o.s.k.l.DeadLetterPublishingRecoverer    : Dead-letter publication to some-topic-name-dlt failed for: some-topic-name-1@0

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:174) ~[kafka-avro-serializer-7.6.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:68) ~[kafka-avro-serializer-7.6.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1017) ~[kafka-clients-3.7.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:964) ~[kafka-clients-3.7.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1103) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:805) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:773) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:576) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:689) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:598) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:564) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:533) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:188) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:108) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:105) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:226) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.IOException: Incompatible schema ["hr.kafka.model.SomeObject"] with refs [{name='hr.kafka.model.SomeObject', subject='some-object-data', version=1}] of type AVRO for schema "bytes". Set latest.compatibility.strict=false to disable this check
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:587) ~[kafka-schema-serializer-7.6.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554) ~[kafka-schema-serializer-7.6.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:138) ~[kafka-avro-serializer-7.6.0.jar:na]
    ... 31 common frames omitted

2025-02-20T15:51:49.328+01:00 ERROR 21780 --- [ntainer#1-0-C-1] [67b714a4e4cacabf56651fb7ea287fae-ac97847a6a6d9cb7] o.s.kafka.listener.DefaultErrorHandler   : Failed to determine if this record (some-topic-name-1@0) should be recovered, including in seeks

org.springframework.kafka.KafkaException: Dead-letter publication to some-topic-name-dlt failed for: some-topic-name-1@0
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:721) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:704) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:598) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:564) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:533) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:188) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:108) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:105) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:226) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Upvotes: 1

Views: 30

Answers (0)

Related Questions