Reputation: 45
I have a use case where the records are to be persisted in table which has foriegn key to itself.
Example:
zObject { uid, name, parentuid }
parent uid also present in same table and any object which has non existent parentuid will be failed to persist .
At times the records are placed in the topic such a way that the dependency is not at the head of the list , instead it will be after the dependent records are present
This will cause failure in process the record . I have used the seektocurrenterrorhandler which actually retries the same failed records for the given backoff and it fails since the dependency is not met .
Is there any way where I can requeue the record at the end of the topic so that dependency is met ? If it fails for day 5 times even after enqueue , the records can be pushed to a DLT .
Thanks, Rajasekhar
Upvotes: 3
Views: 2208
Reputation: 174494
There is nothing built in; you can, however, use a custom destination resolver in the DeadLetterPublishingRecoverer
to determine which topic to publish to, based on a header in the failed record.
See https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#dead-letters
EDIT
@SpringBootApplication
public class So64646996Application {
public static void main(String[] args) {
SpringApplication.run(So64646996Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so64646996").partitions(1).replicas(1).build();
}
@Bean
public NewTopic dlt() {
return TopicBuilder.name("so64646996.DLT").partitions(1).replicas(1).build();
}
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("so64646996.DLT", rec.partition())
: new TopicPartition("so64646996", rec.partition());
}), new FixedBackOff(0L, 0L));
}
@KafkaListener(id = "so64646996", topics = "so64646996")
public void listen(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {
System.out.println(in + "@" + offset + ":" + retry[0]);
throw new IllegalStateException();
}
@KafkaListener(id = "so64646996.DLT", topics = "so64646996.DLT")
public void listenDLT(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {
System.out.println("DLT: " + in + "@" + offset + ":" + retry[0]);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> System.out.println(template.send("so64646996", "foo").get(10, TimeUnit.SECONDS)
.getRecordMetadata());
}
}
Upvotes: 4