Ankit Gautam
Ankit Gautam

Reputation: 129

Spring cloud stream kafka consumer error handling and retries issues

I need help in error handling scenario in spring cloud stream kafka binder. My Application has java 8 consumer of which binding is specified in application.yaml. The consumer is written as :

@Bean
public Consumer<Message<Transaction>> doProcess() {

    return message -> {
        Transaction transaction = message.getPayload();
       
        if(true) {
            throw new RuntimeException("exception!! !!:)");
        }
       Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, 
       Acknowledgment.class);
       if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
       }
  }
}

application.yaml:

spring.application.name: appname
spring.cloud.stream:
  function.definition: doProcess
  kafka:
    default.consumer:
      startOffset: latest
      useNativeDecoding: true
    bindings:
      input.consumer.autoCommitOffset: false

bindings:
  doProcess-in-0:
    destination: kafka.input.topic.name
    group: appGroup
    content-type: application/*+avro
    consumer:
      autoCommitOffset: false.

Now, I am struggling with error handling and have two issues:

  1. I am trying with manual acking the consumption of the message rather than using autoCommitOffset as true. So, when I give autoCommitOffset as false and test for error scenario, facing weird behavior where whenever an exception is thrown the message is retried for 'n' number of times and this retrial / re-delivery of failed message is happening even after restart of the service (if restart is done before n re-trial is completed). And once n retrial is done, the message is not picked even after re-start of the service. So does that mean , the consumer is committing offset after n re-trial/re-delivery of the message which should not be the case as autoCommitOffset is false.

    Note: I have not configured any dlq.

  2. We need to write the custom exception handler, where we can catch the exception(error in both application code and framework) and send notification to a user group via email in AWS env. But, we are not able to find any error handler which can catch both types of exception. Something like extending SeekToCurrentErrorHandler or any other listener that can be called on error event.

Edit :

As per solution provided by Gary, we can use the below beans to configure the custom error handler:

@Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer> MQLCC() {
        System.out.println(String.format("DEBUG: Bean %s has bean created.", "MQLCC"));
        return new ListenerContainerCustomizerCustom ();
    }

    private static class ListenerContainerCustomizerCustom implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
        @Override
        public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
            System.out.println(String.format("HELLO from container %s, destination: %s, group: %s", container, destinationName, group));
        }

    }

Upvotes: 1

Views: 5985

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

The default error handler in the listener container will retry 10 times and then log the error and discard the record; for different behavior you need to configure a custom error handler and recovery strategy. Use a ListenerContainerCustomizer bean to configure the container.

See https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh and https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

(3.2 and later)

or https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current and https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#dead-letters

for earlier versions.

Upvotes: 2

Related Questions