Pdv
Pdv

Reputation: 145

Transferring to DLQ on Production Exception and Deserialization with Spring Cloud Stream Kafka Streams Binder

I've got a simple consumer for which I wish to send the input message to DLQ in case anything goes wrong at deserialization time or during execution of the consumer. As described in the documentation I'm using a DltAwareProcessor for production exception

@Bean
    public Consumer<KStream<Object, MyAvroModel>> process(DltPublishingContext dltSenderContext) {
        return input -> input
                .process(() -> new DltAwareProcessor<>(rec -> {
                    log.info("RECEIVED {}", rec);
                    // DO stuff
                    return rec;
                }, "process-dlq", dltSenderContext));

    }

and the following configuration in application.yml

  cloud.stream:
    bindings:
      process-in-0:
        destination: process-topic
        group: process-group
        max-attempts: 1
        content-type: application/*+avro
    kafka:
      binder: 
        brokers: ${KAFKA_HOSTS}
      streams:
        bindings:
          process-in-0:
            consumer:
              useNativeDecoding: true
              enableDlq: true
              dlqName: process-topic-dlq
        binder:
          deserializationExceptionHandler: sendToDlq

Application starts correctly but when the consumer fails, I get an exception :

stream-thread [process-group-18fbe5b4-f8a9-4192-85f2-ffec28756445-StreamThread-1] Failed to process stream task 0_33 due to the following error: java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set. at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinderConventional(DefaultBinderFactory.java:278) at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:180) at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:167) at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:419) at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:279) at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:272) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:168) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:147) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:142) at org.springframework.cloud.stream.binder.kafka.streams.DltAwareProcessor.lambda$defaultProcessorRecordRecoverer$0(DltAwareProcessor.java:83) at org.springframework.cloud.stream.binder.kafka.streams.RecordRecoverableProcessor.process(RecordRecoverableProcessor.java:90)

I believe this is caused by the following behaviour described in the documentation :

This is because, DltAwareProcessor uses StreamBridge which uses the regular Kafka binder (message-channel based) which by default uses a ByteArraySerializer for keys.

I can workaround that by adding the plan kafka binder as a dependency and duplicating the kafka conf to configure it but I then get an NPE after the message is sent to the DLQ :

ERROR o.a.k.s.p.internals.TaskExecutor - stream-thread [process-group-306cc04c-9994-46bf-8174-7226eebe1aa2-StreamThread-1] Failed to process stream task 0_26 due to the following error: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "result" is null at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.doPostProcessResult(CloudEventsFunctionInvocationHelper.java:138) at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:114) at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:48) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:193) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:147) at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:142) at org.springframework.cloud.stream.binder.kafka.streams.DltAwareProcessor.lambda$defaultProcessorRecordRecoverer$0(DltAwareProcessor.java:83) at org.springframework.cloud.stream.binder.kafka.streams.RecordRecoverableProcessor.process(RecordRecoverableProcessor.java:90)

So, first question : Do I really need to bring both binders as dependencies and configure both to use Kafka Streams and the DltAwareProcessor or did I misconfigure something ?

Second question : If so, how do I get around that NPE ? Looking at the code, it seems to happen because there is no output from the DLQ function

Upvotes: 0

Views: 509

Answers (1)

sobychacko
sobychacko

Reputation: 5924

When using the DltAwareProcessor in the Spring Cloud Stream Kafka Streams binder, you need to use both the regular Kafka binder and the Kafka Streams binder. Here is a sample application that you can follow for the basic idea: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-recoverable. Look at the dependencies on that sample project.

The second issue is that you are using AVRO, and there are no corresponding AVRO message converters in the application for Spring Cloud Stream to use. Just so you know, this is different from any SerDe's that you are providing to Kafka Streams. You can fix this problem via either of the following options.

  1. You can use the same underlying Kafka AVRO serializers used by the SerDes and then enable native encoding in Spring Cloud Stream. Basically, set an AVRO value serializer via Kafka configuration (value.serializer) on the DLT output binding. Here are some details about this approach: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka_tips.html#use-native-serializer-and-deserializer-problem-statement
  2. Or you can also use an appropriate AVRO message converter that is compatible with Spring's MessageConverterAPI. See this issue comment where we provide a strategy for that: https://github.com/spring-cloud/spring-cloud-stream/issues/2881#issuecomment-1909237402

Upvotes: 0

Related Questions