user60108
user60108

Reputation: 3462

Error handling - Consumer - apache kafka and spring

I am learning to use kafka, I have two services a producer and a consumer.

The producer produces messages that require processing (queries to services and database). These messages are received by the consumer, it is responsible for processing them and saves the result in a database

Producer

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
...
kafkaTemplate.send(topic, message);

Consumer

@KafkaListener(topics = "....")
public void listen(@Payload String message) {
....
}

I would like all messages to be processed correctly by the consumer. I do not know how to handle errors on the consumer side in this context. For example, a database might be temporarily disabled and could not handle certain messages.

What to do in these cases?

I know that the responsibility belongs to the consumer. I could do retries, but retry several times in a row if a database is down does not seem like a good idea. And if I continue to consume messages, the index advances and I lose the events that I could not process by mistake.

Upvotes: 3

Views: 20160

Answers (2)

user60108
user60108

Reputation: 3462

This link was very helpful https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han

Spring provides the DeadLetterPublishingRecoverer class that performs a correct handling of errors.

Upvotes: 1

asolanki
asolanki

Reputation: 1373

You have control over kafka consumer in form of committing the offset of records read. Kafka will continue to return the same records unless the offset is committed. You can set offset commit to manual and based on the success of your business logic decide whether to commit or not. See a sample below

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "false");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 final int minBatchSize = 200;
 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

Consumer.commitsync() commits the offset.

Also see the kakfa consumer documentation to understand the consumer offsets here .

Upvotes: 3

Related Questions