Rajasekhar
Rajasekhar

Reputation: 45

Requeue the failed record in the kafka topic

I have a use case where the records are to be persisted in table which has foriegn key to itself.

Example:

zObject { uid, name, parentuid }

parent uid also present in same table and any object which has non existent parentuid will be failed to persist .

At times the records are placed in the topic such a way that the dependency is not at the head of the list , instead it will be after the dependent records are present

This will cause failure in process the record . I have used the seektocurrenterrorhandler which actually retries the same failed records for the given backoff and it fails since the dependency is not met .

Is there any way where I can requeue the record at the end of the topic so that dependency is met ? If it fails for day 5 times even after enqueue , the records can be pushed to a DLT .

Thanks, Rajasekhar

Upvotes: 3

Views: 2208

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

There is nothing built in; you can, however, use a custom destination resolver in the DeadLetterPublishingRecoverer to determine which topic to publish to, based on a header in the failed record.

See https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#dead-letters

EDIT

@SpringBootApplication
public class So64646996Application {

    public static void main(String[] args) {
        SpringApplication.run(So64646996Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so64646996").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name("so64646996.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    public ErrorHandler eh(KafkaOperations<String, String> template) {
        return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
                (rec, ex) -> {
                    org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                    if (retries == null) {
                        retries = new RecordHeader("retries", new byte[] { 1 });
                        rec.headers().add(retries);
                    }
                    else {
                        retries.value()[0]++;
                    }
                    return retries.value()[0] > 5
                            ? new TopicPartition("so64646996.DLT", rec.partition())
                            : new TopicPartition("so64646996", rec.partition());
                }), new FixedBackOff(0L, 0L));
    }


    @KafkaListener(id = "so64646996", topics = "so64646996")
    public void listen(String in,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(name = "retries", required = false) byte[] retry) {

        System.out.println(in + "@" + offset + ":" + retry[0]);
        throw new IllegalStateException();
    }

    @KafkaListener(id = "so64646996.DLT", topics = "so64646996.DLT")
    public void listenDLT(String in,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(name = "retries", required = false) byte[] retry) {

        System.out.println("DLT: " + in + "@" + offset + ":" + retry[0]);
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> System.out.println(template.send("so64646996", "foo").get(10, TimeUnit.SECONDS)
                .getRecordMetadata());
    }

}

Upvotes: 4

Related Questions