Reputation: 115
I have Spring Boot consumer that consumes Avro messages and I have configured it as retry topic with DLT. When SerializationException
occur, it causes infinite loop and it produces huge amount of logs..
As I have read, SerializationException
s should not be retried, so I am puzzled why this happens.
How could I fix it, and prevent it from filling up logs? And how to properly handle that serialization exception and maybe store those messages in persistance layer or something?
This is my config:
@EnableKafka
@Configuration
@AllArgsConstructor
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.listener.concurrency:1}")
private final Integer concurrency;
@Value("${spring.kafka.listener.enable-observation:true}")
private final Boolean enableObservation;
@Bean
public ConcurrentKafkaListenerContainerFactory<SpecificRecord, SpecificRecord> kafkaListenerContainerFactory(
ConsumerFactory<SpecificRecord, SpecificRecord> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<SpecificRecord, SpecificRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(concurrency);
factory.getContainerProperties().setObservationEnabled(enableObservation);
return factory;
}
}
@EnableKafka
@Configuration
@AllArgsConstructor
public class KafkaProducerConfig {
@Value("${spring.kafka.template.enable-observation:true}")
private final Boolean enableObservation;
@Value("${spring.kafka.template.default-topic}")
private final String defaultTopic;
@Bean
public KafkaTemplate<SpecificRecord, SpecificRecord> kafkaTemplate(
ProducerFactory<SpecificRecord, SpecificRecord> producerFactory
) {
KafkaTemplate<SpecificRecord, SpecificRecord> kafkaTemplate =
new KafkaTemplate<>(producerFactory);
kafkaTemplate.setObservationEnabled(enableObservation);
kafkaTemplate.setDefaultTopic(defaultTopic);
return kafkaTemplate;
}
}
@Component
@RetryableTopic(
backoff = @Backoff(
delayExpression = "${merchant.kafka.notification.backoff-delay}"
),
attempts = "${merchant.kafka.notification.topic.max-retry-attempts}",
dltStrategy = DltStrategy.FAIL_ON_ERROR,
numPartitions = "${merchant.kafka.notification.retry-topic.partitions}",
replicationFactor = "${merchant.kafka.notification.retry-topic.replication-factor}",
concurrency = "${merchant.kafka.notification.topic.concurrency}"
)
@KafkaListener(
topics = "${merchant.kafka.notification.topic.name}",
containerFactory = "kafkaListenerContainerFactory",
concurrency = "${merchant.kafka.notification.topic.concurrency}"
)
public class KafkaNotificationConsumer {
@KafkaHandler
public void handleNotification(
@NonNull @Payload SomeAvroObject data,
@NonNull @Headers KafkaMessageHeaders headers
) {
// .. Process
}
@DltHandler
public void processMessage(@NonNull SomeAvroObject data) {
log.error("Could not process ");
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
log.error("Unknown object type was received on consumer: {}", object);
}
}
spring:
kafka:
listener:
concurrency: 2
observation-enabled: true
bootstrap-servers: localhost:9092,localhost:9082,localhost:9072
consumer:
group-id: ${spring.application.name}
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
auto-offset-reset: earliest
properties:
spring.deserializer.key.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
producer:
client-id: ${merchant.kafka.appName}
key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
auto.register.schemas: false
use.latest.version: true
properties:
schema.registry.url: http://localhost:8072
template:
default-topic: ${merchant.kafka.notification.topic.name}
observation-enabled: true
Log:
2025-02-20T15:51:48.708+01:00 ERROR 21780 --- [ntainer#1-0-C-1] [67b714a4e4cacabf56651fb7ea287fae-ac97847a6a6d9cb7] k.r.DeadLetterPublishingRecovererFactory : Record: topic = some-topic-name, partition = 1, offset = 0, main topic = some-topic-name threw an error at topic some-topic-name and won't be retried. Sending to DLT with name some-topic-name-dlt.
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2873) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2921) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2773) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize
at org.springframework.kafka.support.serializer.SerializationUtils.deserializationException(SerializationUtils.java:158) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:218) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:693) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.7.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1625) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1600) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1405) ~[spring-kafka-3.2.0.jar:3.2.0]
... 4 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 12
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:542) ~[kafka-avro-serializer-7.6.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:188) ~[kafka-avro-serializer-7.6.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:108) ~[kafka-avro-serializer-7.6.0.jar:na]
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:215) ~[spring-kafka-3.2.0.jar:3.2.0]
... 17 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Invalid index: 13
at hr.kafka.model.SomeObject.put(SomeObject.java:179) ~[classes/:na]
at org.apache.avro.generic.GenericData.setField(GenericData.java:851) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.3.jar:1.11.3]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:510) ~[kafka-avro-serializer-7.6.0.jar:na]
... 20 common frames omitted
2025-02-20T15:51:49.324+01:00 ERROR 21780 --- [ntainer#1-0-C-1] [67b714a4e4cacabf56651fb7ea287fae-ac97847a6a6d9cb7] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication to some-topic-name-dlt failed for: some-topic-name-1@0
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:174) ~[kafka-avro-serializer-7.6.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:68) ~[kafka-avro-serializer-7.6.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1017) ~[kafka-clients-3.7.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:964) ~[kafka-clients-3.7.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1103) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:805) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:773) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:576) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:689) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:598) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:564) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:533) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:188) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:108) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:105) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:226) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-3.2.0.jar:3.2.0]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.IOException: Incompatible schema ["hr.kafka.model.SomeObject"] with refs [{name='hr.kafka.model.SomeObject', subject='some-object-data', version=1}] of type AVRO for schema "bytes". Set latest.compatibility.strict=false to disable this check
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:587) ~[kafka-schema-serializer-7.6.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554) ~[kafka-schema-serializer-7.6.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:138) ~[kafka-avro-serializer-7.6.0.jar:na]
... 31 common frames omitted
2025-02-20T15:51:49.328+01:00 ERROR 21780 --- [ntainer#1-0-C-1] [67b714a4e4cacabf56651fb7ea287fae-ac97847a6a6d9cb7] o.s.kafka.listener.DefaultErrorHandler : Failed to determine if this record (some-topic-name-1@0) should be recovered, including in seeks
org.springframework.kafka.KafkaException: Dead-letter publication to some-topic-name-dlt failed for: some-topic-name-1@0
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:721) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:704) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:598) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:564) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:533) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:188) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:108) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:105) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:226) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-3.2.0.jar:3.2.0]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Upvotes: 1
Views: 30