Chad Showalter
Chad Showalter

Reputation: 401

Can I use a spring-kafka RecoveringBatchErrorHandler to handle deserialization exceptions?

I'd like a batch listener that commits offsets prior to a record that fails, logs the failed record, and then retrieves a new batch starting with the first offset after the failed record.

My current approach handles exceptions thrown in my listener's code, by throwing a BatchListenerFailedException that is handled by the RecoveringBatchErrorHandler as I intend. However, I would like to handle all exceptions in this way; that is, an exception thrown by the listener and any exception due to a deserialization failure. I'm using a BatchMessagingMessageConverter. I understand that I could use an ErrorHandlingDeserializer if the deserialization exception occurred in the Kafka Deserializer; however, deserialization exceptions occur with my configuration in the MessagingMessageConverter, which I believe is after the Kafka client BytesDeserializer has successfully deserialized my message. How can I best achieve my goal?

Here's my container factory config:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
         ConsumerFactory<Object, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
    );
    factory.setBatchErrorHandler(errorHandler);
    BatchMessagingMessageConverter messageConverter = new BatchMessagingMessageConverter(new BytesJsonMessageConverter());
    factory.setMessageConverter(messageConverter);
    factory.setConcurrency(1);
    return factory;
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "pojo-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

And here is my Listener:

@KafkaListener(id = "myKafkaListener", idIsGroup = false, autoStartup = "true", topics = {"pojo-topic"}, containerFactory = "kafkaListenerContainerFactory")
public void receive(List<Message<Pojo>> messages) {
    System.out.println("received " + messages.size() + " messages");
    int i = 0;
    try {
        //exceptions thrown here are handled as I intend
        for (var mm : messages) {
            var m = mm.getPayload();
            System.out.println("received: " + m + " at offset " + mm.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
            i++;
        }
    } catch (Exception e) {
        throw new BatchListenerFailedException("listener threw exception when processing batch", e, i);
    }
}

UPDATE

Here is the stack trace from when I send a string (just "A") instead of a json object, and deserialization fails:

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2015) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1859) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1725) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2376) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 8 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:122) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:174) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:322) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:153) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3643) ~[jackson-databind-2.12.4.jar:2.12.4]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:119) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 16 common frames omitted

Upvotes: 1

Views: 2672

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Here are two solutions; the first uses the ErrorHandlingDeserializer and JsonDeserializer. The second is a work-around, and uses the ByteArrayJsonDeserializer I have opened an issue to provide a more seamless solution in the batch listener adapter.

Example 1, using deserializer:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.listener.type=batch
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo == null
                    && headers.get(i).get(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {

                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT",
            properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                ":org.apache.kafka.common.serialization.StringDeserializer")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    BatchErrorHandler eh(ProducerFactory<String, byte[]> pf) {
        KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}");
            template.send("so69055510", "JUNK");
            template.send("so69055510", "{\"bar\":\"qux\"}");
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]

Example 2, using a custom message converter. Note that, for this work around, you need some way to indicate in your domain object that deserialization failed:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo.getBar().equals("thisIsABadOne")) {
                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    ByteArrayJsonMessageConverter converter() {
        return new ByteArrayJsonMessageConverter() {

            @Override
            public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer, Type type) {

                try {
                    return super.toMessage(record, acknowledgment, consumer, Foo.class); // <<<<<< type
                }
                catch (ConversionException ex) {
                    return MessageBuilder.withPayload(new Foo("thisIsABadOne"))
                            .build();
                }
            }

        };
    }

    @Bean
    BatchErrorHandler eh(KafkaTemplate<String, byte[]> template) {
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, byte[]> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}".getBytes());
            template.send("so69055510", "JUNK".getBytes());
            template.send("so69055510", "{\"bar\":\"qux\"}".getBytes());
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]

Upvotes: 3

Related Questions