Reputation: 145
I've got a simple consumer for which I wish to send the input message to DLQ in case anything goes wrong at deserialization time or during execution of the consumer.
As described in the documentation I'm using a DltAwareProcessor
for production exception
@Bean
public Consumer<KStream<Object, MyAvroModel>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(rec -> {
log.info("RECEIVED {}", rec);
// DO stuff
return rec;
}, "process-dlq", dltSenderContext));
}
and the following configuration in application.yml
cloud.stream:
bindings:
process-in-0:
destination: process-topic
group: process-group
max-attempts: 1
content-type: application/*+avro
kafka:
binder:
brokers: ${KAFKA_HOSTS}
streams:
bindings:
process-in-0:
consumer:
useNativeDecoding: true
enableDlq: true
dlqName: process-topic-dlq
binder:
deserializationExceptionHandler: sendToDlq
Application starts correctly but when the consumer fails, I get an exception :
stream-thread [process-group-18fbe5b4-f8a9-4192-85f2-ffec28756445-StreamThread-1] Failed to process stream task 0_33 due to the following error: java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set. at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinderConventional(DefaultBinderFactory.java:278) at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:180) at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:167) at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:419) at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:279) at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:272) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:168) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:147) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:142) at org.springframework.cloud.stream.binder.kafka.streams.DltAwareProcessor.lambda$defaultProcessorRecordRecoverer$0(DltAwareProcessor.java:83) at org.springframework.cloud.stream.binder.kafka.streams.RecordRecoverableProcessor.process(RecordRecoverableProcessor.java:90)
I believe this is caused by the following behaviour described in the documentation :
This is because, DltAwareProcessor uses StreamBridge which uses the regular Kafka binder (message-channel based) which by default uses a ByteArraySerializer for keys.
I can workaround that by adding the plan kafka binder as a dependency and duplicating the kafka conf to configure it but I then get an NPE after the message is sent to the DLQ :
ERROR o.a.k.s.p.internals.TaskExecutor - stream-thread [process-group-306cc04c-9994-46bf-8174-7226eebe1aa2-StreamThread-1] Failed to process stream task 0_26 due to the following error: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "result" is null at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.doPostProcessResult(CloudEventsFunctionInvocationHelper.java:138) at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:114) at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:48) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:193) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:147) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:142) at org.springframework.cloud.stream.binder.kafka.streams.DltAwareProcessor.lambda$defaultProcessorRecordRecoverer$0(DltAwareProcessor.java:83) at org.springframework.cloud.stream.binder.kafka.streams.RecordRecoverableProcessor.process(RecordRecoverableProcessor.java:90)
So, first question : Do I really need to bring both binders as dependencies and configure both to use Kafka Streams and the DltAwareProcessor
or did I misconfigure something ?
Second question : If so, how do I get around that NPE ? Looking at the code, it seems to happen because there is no output from the DLQ function
Upvotes: 0
Views: 509
Reputation: 5924
When using the DltAwareProcessor
in the Spring Cloud Stream Kafka Streams binder, you need to use both the regular Kafka binder and the Kafka Streams binder. Here is a sample application that you can follow for the basic idea: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-recoverable. Look at the dependencies on that sample project.
The second issue is that you are using AVR
O, and there are no corresponding AVRO
message converters in the application for Spring Cloud Stream to use. Just so you know, this is different from any SerDe's that you are providing to Kafka Streams. You can fix this problem via either of the following options.
AVRO
serializers used by the SerDes and then enable native encoding in Spring Cloud Stream. Basically, set an AVRO value serializer via Kafka configuration (value.serializer
) on the DLT output binding. Here are some details about this approach: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka_tips.html#use-native-serializer-and-deserializer-problem-statementMessageConverter
API. See this issue comment where we provide a strategy for that: https://github.com/spring-cloud/spring-cloud-stream/issues/2881#issuecomment-1909237402Upvotes: 0