Sach
Sach

Reputation: 51

Spring cloud Stream Kafka DLQ throw RecordTooLargeException for Batch mode

I am using Spring cloud Stream kafka Binder in my project. We have enabled the batch-mode true and batch size is 500 messages. We are using DeadLetterPublishingRecoverer to send the message to DLQ. If any message failed then we throw BatchListenerFailedException. This functionality worked fine and messages are sending into DLQ.

Recently we have an issue where while sending the message to DLQ, Kafka throws "RecordTooLargeException: Message is 15762601 bytes when serialized which is larger than 1048576". Our individual message size is in bytes. What I observed is Kafka read the messages in batch which is 500 messages and if any 1 message failed, Kafka wrap all the 500 messages with exceptions and put it into Kafka Header in "kafka_dlt-exception-message" and "kafka_dlt-exception-stacktrace". And due this message size grows further.

I can remove those 2 headers("kafka_dlt-exception-message" and "kafka_dlt-exception-stacktrace") to get rid of this issue but my question is, Is this a right way to handle this issue?

Below is the error stack trace I am getting. I am using Spring cloud stream version 3.2.6.

Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@2a4bc0e4]; nested exception is org.springframework.kafka.listener.BatchListenerFailedException: Exception while generating outbound message; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=[{"message":{src:"test"}}, {"message":{src:"test1"}}, {"message":{src:"test2"}}, headers={skip-input-typeconversion=false,kafka_offset=[2193, 2194, 2195],kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4730c07,  kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[2, 2, 2], kafka_receivedMessageKey=[test, test1, test2], kafka_batchConvertedHeaders=[{}, {}, {}], kafka_receivedTopic=[input-topic, input-topic, input-topic], kafka_receivedTimestamp=[1677649913630, 1677649913629,1677649913629], contentType=application/json, kafka_groupId=group1}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)

Below is consumer function

@Bean
    Consumer<Message<String>> processMessages() {
        return input -> {
        try{
            //Processing Logic
            }catch(CustomException exp){
            throw new BatchListenerFailedException(exp.getMessage(),exp.getCause(),exp.getIndex()) //getIndex() return the index of the message
        }
        };
    } 

After debugging the issue I Observed that method AbstractMessageHandler -> handleMessage inside the exception block, code is passing all the messages to MessageHandlingException class which I thought it should be passing only failed message(I may be wrong) and due to that the header "kafka_dlt-exception-message" and "kafka_dlt_exception-stacktrace" contains all the messages and the message size grows.

Thanks

Upvotes: 0

Views: 944

Answers (1)

Gary Russell
Gary Russell

Reputation: 174759

This works as expected; the stack trace is as it should be; please edit the question to show your code/config so we can see what's wrong...

@SpringBootApplication
public class So75594534Application {

    public static void main(String[] args) {
        SpringApplication.run(So75594534Application.class, args);
    }

    @Bean
    Consumer<List<String>> consumer() {
        return list -> {
            System.out.println(list);
            if (list.size() > 1) {
                throw new BatchListenerFailedException("test", 1);
            }
        };
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("consumer-in-0.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> cust(
            KafkaTemplate<byte[], byte[]> template) {

        return (container, group, dest) -> {
            container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template) {

                @Override
                protected void publish(ProducerRecord<Object, Object> outRecord,
                                       KafkaOperations<Object, Object> kafkaTemplate, ConsumerRecord<?, ?> inRecord) {

                    super.publish(outRecord, kafkaTemplate, inRecord);
                    System.out.println("DLT Stack Trace: \n" +
                            new String(outRecord.headers().lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE).value()));
                }


            }, new FixedBackOff(0L, 0L)));
        };

    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("consumer-in-0", "test1".getBytes());
            template.send("consumer-in-0", "test2".getBytes());
            template.send("consumer-in-0", "test3".getBytes());
        };
    }

}
spring.cloud.stream.bindings.consumer-in-0.group=foo
spring.cloud.stream.bindings.consumer-in-0.consumer.batch-mode=true

spring.kafka.producer.properties.linger.ms=1000

Result:

[test1, test2, test3]
DLT Stack Trace: 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2945)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2487)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2463)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:2402)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2294)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2187)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2166)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1524)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1488)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1363)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@1587fead], failedMessage=GenericMessage [payload=[[B@7f544fe8, [B@7e09af6, [B@281b074c], headers={kafka_offset=[15, 16, 17], kafka_conversionFailures=[], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7314418f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[null, null, null], kafka_batchConvertedHeaders=[{}, {}, {}], kafka_receivedTopic=[consumer-in-0, consumer-in-0, consumer-in-0], kafka_receivedTimestamp=[1677603279132, 1677603279140, 1677603279140], contentType=application/json, kafka_groupId=foo}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
    at io.micrometer.observation.Observation.observe(Observation.java:492)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:568)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:540)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2473)
    ... 10 more
Caused by: org.springframework.kafka.listener.BatchListenerFailedException: test @-1
    at com.example.demo.So75594534Application.lambda$consumer$0(So75594534Application.java:37)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:973)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:684)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:533)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:785)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:621)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    ... 29 more

There's nothing in the stack trace about the batch.

Upvotes: 0

Related Questions