Reputation: 51
I am using Spring cloud Stream kafka Binder in my project. We have enabled the batch-mode true and batch size is 500 messages. We are using DeadLetterPublishingRecoverer to send the message to DLQ. If any message failed then we throw BatchListenerFailedException. This functionality worked fine and messages are sending into DLQ.
Recently we have an issue where while sending the message to DLQ, Kafka throws "RecordTooLargeException: Message is 15762601 bytes when serialized which is larger than 1048576". Our individual message size is in bytes. What I observed is Kafka read the messages in batch which is 500 messages and if any 1 message failed, Kafka wrap all the 500 messages with exceptions and put it into Kafka Header in "kafka_dlt-exception-message" and "kafka_dlt-exception-stacktrace". And due this message size grows further.
I can remove those 2 headers("kafka_dlt-exception-message" and "kafka_dlt-exception-stacktrace") to get rid of this issue but my question is, Is this a right way to handle this issue?
Below is the error stack trace I am getting. I am using Spring cloud stream version 3.2.6.
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@2a4bc0e4]; nested exception is org.springframework.kafka.listener.BatchListenerFailedException: Exception while generating outbound message; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=[{"message":{src:"test"}}, {"message":{src:"test1"}}, {"message":{src:"test2"}}, headers={skip-input-typeconversion=false,kafka_offset=[2193, 2194, 2195],kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4730c07, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[2, 2, 2], kafka_receivedMessageKey=[test, test1, test2], kafka_batchConvertedHeaders=[{}, {}, {}], kafka_receivedTopic=[input-topic, input-topic, input-topic], kafka_receivedTimestamp=[1677649913630, 1677649913629,1677649913629], contentType=application/json, kafka_groupId=group1}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
Below is consumer function
@Bean
Consumer<Message<String>> processMessages() {
return input -> {
try{
//Processing Logic
}catch(CustomException exp){
throw new BatchListenerFailedException(exp.getMessage(),exp.getCause(),exp.getIndex()) //getIndex() return the index of the message
}
};
}
After debugging the issue I Observed that method AbstractMessageHandler -> handleMessage inside the exception block, code is passing all the messages to MessageHandlingException class which I thought it should be passing only failed message(I may be wrong) and due to that the header "kafka_dlt-exception-message" and "kafka_dlt_exception-stacktrace" contains all the messages and the message size grows.
Thanks
Upvotes: 0
Views: 944
Reputation: 174759
This works as expected; the stack trace is as it should be; please edit the question to show your code/config so we can see what's wrong...
@SpringBootApplication
public class So75594534Application {
public static void main(String[] args) {
SpringApplication.run(So75594534Application.class, args);
}
@Bean
Consumer<List<String>> consumer() {
return list -> {
System.out.println(list);
if (list.size() > 1) {
throw new BatchListenerFailedException("test", 1);
}
};
}
@Bean
NewTopic dlt() {
return TopicBuilder.name("consumer-in-0.DLT").partitions(1).replicas(1).build();
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> cust(
KafkaTemplate<byte[], byte[]> template) {
return (container, group, dest) -> {
container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template) {
@Override
protected void publish(ProducerRecord<Object, Object> outRecord,
KafkaOperations<Object, Object> kafkaTemplate, ConsumerRecord<?, ?> inRecord) {
super.publish(outRecord, kafkaTemplate, inRecord);
System.out.println("DLT Stack Trace: \n" +
new String(outRecord.headers().lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE).value()));
}
}, new FixedBackOff(0L, 0L)));
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("consumer-in-0", "test1".getBytes());
template.send("consumer-in-0", "test2".getBytes());
template.send("consumer-in-0", "test3".getBytes());
};
}
}
spring.cloud.stream.bindings.consumer-in-0.group=foo
spring.cloud.stream.bindings.consumer-in-0.consumer.batch-mode=true
spring.kafka.producer.properties.linger.ms=1000
Result:
[test1, test2, test3]
DLT Stack Trace:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2945)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2463)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:2402)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2294)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2187)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2166)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1524)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1488)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1363)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@1587fead], failedMessage=GenericMessage [payload=[[B@7f544fe8, [B@7e09af6, [B@281b074c], headers={kafka_offset=[15, 16, 17], kafka_conversionFailures=[], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7314418f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[null, null, null], kafka_batchConvertedHeaders=[{}, {}, {}], kafka_receivedTopic=[consumer-in-0, consumer-in-0, consumer-in-0], kafka_receivedTimestamp=[1677603279132, 1677603279140, 1677603279140], contentType=application/json, kafka_groupId=foo}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:568)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:540)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2473)
... 10 more
Caused by: org.springframework.kafka.listener.BatchListenerFailedException: test @-1
at com.example.demo.So75594534Application.lambda$consumer$0(So75594534Application.java:37)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:973)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:684)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:533)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:785)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:621)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
... 29 more
There's nothing in the stack trace about the batch.
Upvotes: 0