Sajin Surendran
Sajin Surendran

Reputation: 284

Handling exception in Spring Kafka

I am using spring-kafka 2.2.6. I have used SeekToCurrentErrorHandler and ErrorHandlingDeserializer2. SeekToCurrentErrorHandler currently configured to log message after three retries. Is there any way to skip retries for validation errors (caught by Validator implementation in Spring) and message conversion errors? All the errors are being intercepted by container error handler i.e SeeToCurrentErrorHandler. Should I override the handle method of SeeToCurrentErrorHandler?

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(false);
    factory.setErrorHandler(new SeekToCurrentErrorHandler((c, e) -> {
        LOG.info(e.getMessage());
    }, this.kafkaConfigProperties.getRetryCount()));
    return factory;
}

 @Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> map = new HashMap<>();
    Properties consumerProperties = getConsumerProperties();
    consumerProperties.forEach((key, value) -> {
        map.put((String) key, value);
    });
    KafkaSoapMessageConverter kafkaSoapMessageConverter = new KafkaSoapMessageConverter();
    Map<String, Object> configMap = new HashMap<>(1);
    configMap.put(KafkaSoapMessageConverter.CLASS_TO_DESERIALIZE, MyClass.class);
    kafkaSoapMessageConverter.configure(configMap, false);
    ErrorHandlingDeserializer2<Object> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
            kafkaSoapMessageConverter);
    DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
    consumerFactory.setValueDeserializer(errorHandlingDeserializer);
    return consumerFactory;
}

EDIT

I have used the below code

if(DeserializationException.class == e.getClass() 
        || e.getCause().getClass() == MethodArgumentNotValidException.class) {
    SeekUtils.doSeeks(records, consumer, e, true, (c, e) -> { return true; }, LOG); 
} else {
    super.handle(e, records, consumer, container);
}

Upvotes: 0

Views: 1817

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Version 2.3 (current is 2.3.5) added the ability to configure which exceptions are retryable:

/**
 * Set an exception classifications to determine whether the exception should cause a retry
 * (until exhaustion) or not. If not, we go straight to the recoverer. By default,
 * the following exceptions will not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried.
 * When calling this method, the defaults will not be applied.
 * @param classifications the classifications.
 * @param defaultValue whether or not to retry non-matching exceptions.
 * @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
 */
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {

This is how the defaults are set up:

    Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
    classified.put(DeserializationException.class, false);
    classified.put(MessageConversionException.class, false);
    classified.put(MethodArgumentResolutionException.class, false);
    classified.put(NoSuchMethodException.class, false);
    classified.put(ClassCastException.class, false);

Also, you can add exceptions to the defaults:

/**
 * Add an exception type to the default list; if and only if an external classifier
 * has not been provided. By default, the following exceptions will not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried.
 * @param exceptionType the exception type.
 * @see #removeNotRetryableException(Class)
 * @see #setClassifications(Map, boolean)
 */
public void addNotRetryableException(Class<? extends Exception> exceptionType) {
    Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
            "Cannot add exception types to a supplied classifier");
    ((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().put(exceptionType, false);
}

Upvotes: 1

Related Questions