Martin Mucha
Martin Mucha

Reputation: 3091

transactional behavior in spring-kafka

I read spring-kafka/kafka documentation back and forth, and still cannot find a way, how to do proper transactional behavior with error recovering. I believe this is not trivial question, so please read until end. I believe whole this question revolves around finding way how to reposition over failing record or how to ack in error handler. But mabye there are better ways, I don't know.

So records are flowing in, and some of them are invalid. What I would like to have as a minimal solution is(in which I will then fix sevaral problems you probably see as well):

1) we cannot afford the luxury of stopping the production in case of some trivial mishap, like one or few invalid records. Thus if there is invalid record in kafka topic, I would like to log it, or resend it to different queue, but then proceed with processing following records.

2) there are permanent and temporary failures. Permanent failure is record unable to deserialize, record failing data validation. In this case, I'd like to skip the invalid record, as discussed in 1). Temporary failure might be some specific exception or state, like for example database connection errors, network issues etc. In this case, we do not want to skip failing record, we want to retry, after some delay.

Subject of this question is ONLY implementing skip/don't skip behavior.

Lets say, that this is our starting point:

private Map<String, Object> createKafkaConsumerFactoryProperties(String bootstrapServers, String groupId, Class<?> valueDeserializerClass) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

return props;
}

@Bean(name="SomeFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
    @Value("${…}") String bootstrapServers,
    @Value("${…}") String groupId) {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();

ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
        createKafkaConsumerFactoryProperties(bootstrapServers, groupId, AvroDeserializer.class),
        new StringDeserializer(),
        new AvroDeserializer(SomeClass.class));

factory.setConsumerFactory(consumerFactory);
//        factory.setConcurrency(2);
//        factory.setBatchListener(true);
return factory;
}

and we have listener like:

@KafkaListener(topics = "${…}", containerFactory = "SomeFactory")
public void receive(@Valid List<SomeClass> messageList) {/*logic*/}

Now how this behave if I understand correctly:

So what is the solution to this madness? Well the best I can come up with right now is pretty ugly: do not fail in deserializer (bad), do not accept specific type in listener (bad), filter out KafkaNulls manually (bad) and finally trigger bean validation manually (bad). Is there a better way? Thanks for examplantion, I'd be grateful for every hint or direction given how to achieve this.

Upvotes: 0

Views: 588

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

See the documentation for the upcoming 2.2 release (due tomorrow).

The DefaultAfterRollbackProcessor (when using transactions) and SeekToCurrentErrorHandler (when not using transactions) can now recover (skip) records that keep failing, and will do so after 10 failures, by default. They can be configured to publish failed records to a dead-letter topic.

Also see the Error Handling Deserializer which catches deserialization problems and passes them to the container so they can be sent to the error handler.

Upvotes: 0

Related Questions